Ingresskubernetes中一个比较重要的资源,Ingress控制器通常位于集群的边沿负责集群服务的对外,虽然有TraefikContour等其它Ingress Controller(常见的controller可以看看这里),但生产中还是使用官方Ingress Nginx Controller居多,本文将通过源码来探究下它的原理,解密Ingress到nginx.conf文件的生成过程。

本次分析将聚焦在Nginx配置文件的生成过程上,对于源码本身不是我们关注的重点,阅读时会略过一些不太重要的代码。

源码基于ingress-nginx-controller-v1.1.0

首先来看下Controller自身的定义

NGINXController

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
type NGINXController struct {
	cfg *Configuration   // 控制器自身配置参数

	recorder record.EventRecorder  // 事件处理

	syncQueue *task.Queue

	syncStatus status.Syncer // 状态上报,关于它的作用可以看下这篇:https://blog.dianduidian.com/post/kubernetes-ingress%E7%8A%B6%E6%80%81%E4%B8%8A%E6%8A%A5%E6%9C%BA%E5%88%B6/

	syncRateLimiter flowcontrol.RateLimiter  // 限流器

	// stopLock is used to enforce that only a single call to Stop send at
	// a given time. We allow stopping through an HTTP endpoint and
	// allowing concurrent stoppers leads to stack traces.
	stopLock *sync.Mutex

	stopCh   chan struct{}
	updateCh *channels.RingChannel

	// ngxErrCh is used to detect errors with the NGINX processes
	ngxErrCh chan error

	// runningConfig contains the running configuration in the Backend
	runningConfig *ingress.Configuration  // 当前正在运行的配置,用来比对配置是否有更新

	t ngx_template.Writer  // 模板渲染

	resolver []net.IP  // 读取/etc/resolv.conf 中的ns地址,用来生成nginx.conf使用,eg:resolver 127.0.0.1 [::1]:5353;

	isIPV6Enabled bool

	isShuttingDown bool

	Proxy *TCPProxy  // tcp代理,启用SSLPassthrough时使用

	store store.Storer  // 本地存储接口,

	metricCollector    metric.Collector   // metric相关
	admissionCollector metric.Collector   // 同上

	validationWebhookServer *http.Server  // 准入控制

	command NginxExecTester   // 操作Nginx的接口,用来启动Nginx进程和测试配置文件
}

再来看下控制器自身的配置

Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
type Configuration struct {
	APIServerHost string
	RootCAFile    string

	KubeConfigFile string

	Client clientset.Interface

	ResyncPeriod time.Duration

	ConfigMapName  string
	DefaultService string

	Namespace string

	WatchNamespaceSelector labels.Selector

	// +optional
	TCPConfigMapName string
	// +optional
	UDPConfigMapName string

	DefaultSSLCertificate string

	// +optional
	PublishService       string
	PublishStatusAddress string

	UpdateStatus           bool
	UseNodeInternalIP      bool
	ElectionID             string
	UpdateStatusOnShutdown bool

	HealthCheckHost string
	ListenPorts     *ngx_config.ListenPorts

	DisableServiceExternalName bool

	EnableSSLPassthrough bool

	EnableProfiling bool

	EnableMetrics  bool
	MetricsPerHost bool

	FakeCertificate *ingress.SSLCert

	SyncRateLimit float32

	DisableCatchAll bool

	IngressClassConfiguration *ingressclass.IngressClassConfiguration

	ValidationWebhook         string
	ValidationWebhookCertPath string
	ValidationWebhookKeyPath  string
	DisableFullValidationTest bool

	GlobalExternalAuth  *ngx_config.GlobalExternalAuth
	MaxmindEditionFiles *[]string

	MonitorMaxBatchSize int

	ShutdownGracePeriod int
}

这些配置需要控制器启动时通过命令行传入

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
bash-5.1$ /nginx-ingress-controller --help
-------------------------------------------------------------------------------
NGINX Ingress controller
  Release:       1.0.0-dev
  Build:         git-9960efe1e
  Repository:    https://github.com/kubernetes/ingress-nginx.git
  nginx version: nginx/1.19.9

-------------------------------------------------------------------------------

Usage of :
      --add_dir_header                          If true, adds the file directory to the header of the log messages
      --alsologtostderr                         log to standard error as well as files
      --annotations-prefix string               Prefix of the Ingress annotations specific to the NGINX controller. (default "nginx.ingress.kubernetes.io")
      --apiserver-host string                   Address of the Kubernetes API server.
                                                Takes the form "protocol://address:port". If not specified, it is assumed the
                                                program runs inside a Kubernetes cluster and local discovery is attempted.
      --certificate-authority string            Path to a cert file for the certificate authority. This certificate is used
                                                only when the flag --apiserver-host is specified.
      --configmap string                        Name of the ConfigMap containing custom global configurations for the controller.
      --controller-class string                 Ingress Class Controller value this Ingress satisfies.
                                                The class of an Ingress object is set using the field IngressClassName in Kubernetes clusters version v1.19.0 or higher. The .spec.controller value of the IngressClass
                                                referenced in an Ingress Object should be the same value specified here to make this object be watched. (default "k8s.io/ingress-nginx")
      --default-backend-service string          Service used to serve HTTP requests not matching any known server name (catch-all).
                                                Takes the form "namespace/name". The controller configures NGINX to forward
                                                requests to the first port of this Service.
      --default-server-port int                 Port to use for exposing the default server (catch-all). (default 8181)
      --default-ssl-certificate string          Secret containing a SSL certificate to be used by the default HTTPS server (catch-all).
                                                Takes the form "namespace/name".
      --disable-catch-all                       Disable support for catch-all Ingresses
      --disable-full-test                       Disable full test of all merged ingresses at the admission stage and tests the template of the ingress being created or updated  (full test of all ingresses is enabled by default)
      --disable-svc-external-name               Disable support for Services of type ExternalName
      --election-id string                      Election id to use for Ingress status updates. (default "ingress-controller-leader")
      --enable-metrics                          Enables the collection of NGINX metrics (default true)
      --enable-ssl-chain-completion             Autocomplete SSL certificate chains with missing intermediate CA certificates.
                                                Certificates uploaded to Kubernetes must have the "Authority Information Access" X.509 v3
                                                extension for this to succeed.
      --enable-ssl-passthrough                  Enable SSL Passthrough.
      --health-check-path string                URL path of the health check endpoint.
                                                Configured inside the NGINX status server. All requests received on the port
                                                defined by the healthz-port parameter are forwarded internally to this path. (default "/healthz")
      --health-check-timeout int                Time limit, in seconds, for a probe to health-check-path to succeed. (default 10)
      --healthz-host string                     Address to bind the healthz endpoint.
      --healthz-port int                        Port to use for the healthz endpoint. (default 10254)
      --http-port int                           Port to use for servicing HTTP traffic. (default 80)
      --https-port int                          Port to use for servicing HTTPS traffic. (default 443)
      --ingress-class string                    [IN DEPRECATION] Name of the ingress class this controller satisfies.
                                                The class of an Ingress object is set using the annotation "kubernetes.io/ingress.class" (deprecated).
                                                The parameter --controller-class has precedence over this. (default "nginx")
      --ingress-class-by-name                   Define if Ingress Controller should watch for Ingress Class by Name together with Controller Class
      --kubeconfig string                       Path to a kubeconfig file containing authorization and API server information.
      --length-buckets float64Slice             Set of buckets which will be used for prometheus histogram metrics such as RequestLength, ResponseLength (default [10.000000,20.000000,30.000000,40.000000,50.000000,60.000000,70.000000,80.000000,90.000000,100.000000])
      --log_backtrace_at traceLocation          when logging hits line file:N, emit a stack trace (default :0)
      --log_dir string                          If non-empty, write log files in this directory
      --log_file string                         If non-empty, use this log file
      --log_file_max_size uint                  Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. (default 1800)
      --logtostderr                             log to standard error instead of files (default true)
      --maxmind-edition-ids string              Maxmind edition ids to download GeoLite2 Databases. (default "GeoLite2-City,GeoLite2-ASN")
      --maxmind-license-key string              Maxmind license key to download GeoLite2 Databases.
                                                https://blog.maxmind.com/2019/12/18/significant-changes-to-accessing-and-using-geolite2-databases
      --maxmind-mirror string                   Maxmind mirror url (example: http://geoip.local/databases
      --maxmind-retries-count int               Number of attempts to download the GeoIP DB. (default 1)
      --maxmind-retries-timeout duration        Maxmind downloading delay between 1st and 2nd attempt, 0s - do not retry to download if something went wrong.
      --metrics-per-host                        Export metrics per-host (default true)
      --monitor-max-batch-size int              Max batch size of NGINX metrics (default 10000)
      --one_output                              If true, only write logs to their native severity level (vs also writing to each lower severity level)
      --post-shutdown-grace-period int          Seconds to wait after the nginx process has stopped before controller exits. (default 10)
      --profiler-port int                       Port to use for expose the ingress controller Go profiler when it is enabled. (default 10245)
      --profiling                               Enable profiling via web interface host:port/debug/pprof/ (default true)
      --publish-service string                  Service fronting the Ingress controller.
                                                Takes the form "namespace/name". When used together with update-status, the
                                                controller mirrors the address of this service's endpoints to the load-balancer
                                                status of all Ingress objects it satisfies.
      --publish-status-address string           Customized address (or addresses, separated by comma) to set as the load-balancer status of Ingress objects this controller satisfies.
                                                Requires the update-status parameter.
      --report-node-internal-ip-address         Set the load-balancer status of Ingress objects to internal Node addresses instead of external.
                                                Requires the update-status parameter.
      --shutdown-grace-period int               Seconds to wait after receiving the shutdown signal, before stopping the nginx process.
      --size-buckets float64Slice               Set of buckets which will be used for prometheus histogram metrics such as BytesSent (default [10.000000,100.000000,1000.000000,10000.000000,100000.000000,1000000.000000,10000000.000000])
      --skip_headers                            If true, avoid header prefixes in the log messages
      --skip_log_headers                        If true, avoid headers when opening log files
      --ssl-passthrough-proxy-port int          Port to use internally for SSL Passthrough. (default 442)
      --status-port int                         Port to use for the lua HTTP endpoint configuration. (default 10246)
      --status-update-interval int              Time interval in seconds in which the status should check if an update is required. Default is 60 seconds (default 60)
      --stderrthreshold severity                logs at or above this threshold go to stderr (default 2)
      --stream-port int                         Port to use for the lua TCP/UDP endpoint configuration. (default 10247)
      --sync-period duration                    Period at which the controller forces the repopulation of its local object stores. Disabled by default.
      --sync-rate-limit float32                 Define the sync frequency upper limit (default 0.3)
      --tcp-services-configmap string           Name of the ConfigMap containing the definition of the TCP services to expose.
                                                The key in the map indicates the external port to be used. The value is a
                                                reference to a Service in the form "namespace/name:port", where "port" can
                                                either be a port number or name. TCP ports 80 and 443 are reserved by the
                                                controller for servicing HTTP traffic.
      --time-buckets float64Slice               Set of buckets which will be used for prometheus histogram metrics such as RequestTime, ResponseTime (default [0.005000,0.010000,0.025000,0.050000,0.100000,0.250000,0.500000,1.000000,2.500000,5.000000,10.000000])
      --udp-services-configmap string           Name of the ConfigMap containing the definition of the UDP services to expose.
                                                The key in the map indicates the external port to be used. The value is a
                                                reference to a Service in the form "namespace/name:port", where "port" can
                                                either be a port name or number.
      --update-status                           Update the load-balancer status of Ingress objects this controller satisfies.
                                                Requires setting the publish-service parameter to a valid Service reference. (default true)
      --update-status-on-shutdown               Update the load-balancer status of Ingress objects when the controller shuts down.
                                                Requires the update-status parameter. (default true)
  -v, --v Level                                 number for the log level verbosity
      --validating-webhook string               The address to start an admission controller on to validate incoming ingresses.
                                                Takes the form "<host>:port". If not provided, no admission controller is started.
      --validating-webhook-certificate string   The path of the validating webhook certificate PEM.
      --validating-webhook-key string           The path of the validating webhook key PEM.
      --version                                 Show release information about the NGINX Ingress controller and exit.
      --vmodule moduleSpec                      comma-separated list of pattern=N settings for file-filtered logging
      --watch-ingress-without-class             Define if Ingress Controller should also watch for Ingresses without an IngressClass or the annotation specified
      --watch-namespace string                  Namespace the controller watches for updates to Kubernetes objects.
                                                This includes Ingresses, Services and all configuration resources. All
                                                namespaces are watched if this parameter is left empty.
      --watch-namespace-selector string         Selector selects namespaces the controller watches for updates to Kubernetes objects.
pflag: help requested

NewNGINXController

控制器初始化

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
	// 事件相关
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
		Interface: config.Client.CoreV1().Events(config.Namespace),
	})

	// 获取ns server
	h, err := dns.GetSystemNameServers()
	if err != nil {
		klog.Warningf("Error reading system nameservers: %v", err)
	}

	n := &NGINXController{
		isIPV6Enabled: ing_net.IsIPv6Enabled(),

		resolver:        h,
		cfg:             config,
		syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),

		recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
			Component: "nginx-ingress-controller",
		}),

		stopCh:   make(chan struct{}),
		updateCh: channels.NewRingChannel(1024),

		ngxErrCh: make(chan error),

		stopLock: &sync.Mutex{},

		runningConfig: new(ingress.Configuration),

		Proxy: &TCPProxy{},

		metricCollector: mc,

		command: NewNginxCommand(),
	}

	if n.cfg.ValidationWebhook != "" {
		n.validationWebhookServer = &http.Server{
			Addr:      config.ValidationWebhook,
			Handler:   adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
			TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
			// disable http/2
			// https://github.com/kubernetes/kubernetes/issues/80313
			// https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159
			TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
		}
	}

	// 存储初始化
	n.store = store.New(
		config.Namespace,
		config.WatchNamespaceSelector,
		config.ConfigMapName,
		config.TCPConfigMapName,
		config.UDPConfigMapName,
		config.DefaultSSLCertificate,
		config.ResyncPeriod,
		config.Client,
		n.updateCh,
		config.DisableCatchAll,
		config.IngressClassConfiguration)

	// 初始化任务队列,n.syncIngress为任务Handler,比较关键
	n.syncQueue = task.NewTaskQueue(n.syncIngress)

	if config.UpdateStatus {
		n.syncStatus = status.NewStatusSyncer(status.Config{
			Client:                 config.Client,
			PublishService:         config.PublishService,
			PublishStatusAddress:   config.PublishStatusAddress,
			IngressLister:          n.store,
			UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
			UseNodeInternalIP:      config.UseNodeInternalIP,
		})
	} else {
		klog.Warning("Update of Ingress status is disabled (flag --update-status)")
	}

	onTemplateChange := func() {
		template, err := ngx_template.NewTemplate(nginx.TemplatePath)
		if err != nil {
			// this error is different from the rest because it must be clear why nginx is not working
			klog.ErrorS(err, "Error loading new template")
			return
		}

		n.t = template
		klog.InfoS("New NGINX configuration template loaded")
		n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
	}

	ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
	if err != nil {
		klog.Fatalf("Invalid NGINX configuration template: %v", err)
	}

	n.t = ngxTpl // 模板

	// 监控nginx.tmpl模板文件的变化
	_, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
	if err != nil {
		klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
	}

	filesToWatch := []string{}
	err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}

		if info.IsDir() {
			return nil
		}

		filesToWatch = append(filesToWatch, path)
		return nil
	})

	if err != nil {
		klog.Fatalf("Error creating file watchers: %v", err)
	}

	for _, f := range filesToWatch {
		_, err = watch.NewFileWatcher(f, func() {
			klog.InfoS("File changed detected. Reloading NGINX", "path", f)
			// 创建新任务用来重新生成nginx.conf
			n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
		})
		if err != nil {
			klog.Fatalf("Error creating file watcher for %v: %v", f, err)
		}
	}

	return n
}

Start()

启动控制器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
func (n *NGINXController) Start() {
	klog.InfoS("Starting NGINX Ingress controller")
  // 1. 通过Informer机制同步并监测k8s资源的变化
	n.store.Run(n.stopCh)

	// ... 省略多实例选举相关的代码,不是我们本次关注的重点

	cmd := n.command.ExecCommand()

	// 给nginx进程设置单独的pgid,防止收到发给controller进程的SIGTERM信号而退出。
	cmd.SysProcAttr = &syscall.SysProcAttr{
		Setpgid: true,
		Pgid:    0,
	}

	if n.cfg.EnableSSLPassthrough {
		n.setupSSLProxy()
	}

	klog.InfoS("Starting NGINX process")
	n.start(cmd) // 2.启动nginx进程

  // 3.启动任务队列
	go n.syncQueue.Run(time.Second, n.stopCh)
	// 4.因为是初次启动,需要触发一起配置文件同步
	n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))

	// 发生错误时,临时文件暂存5分种
	go func() {
		for {
			time.Sleep(5 * time.Minute)
			err := cleanTempNginxCfg()
			if err != nil {
				klog.ErrorS(err, "Unexpected error removing temporal configuration files")
			}
		}
	}()
	// admission webhook相关
	if n.validationWebhookServer != nil {
	  // ...
	}

	for {
		// 多路监听
		select {
		case err := <-n.ngxErrCh:
			if n.isShuttingDown {
				return
			}

			// if the nginx master process dies, the workers continue to process requests
			// until the failure of the configured livenessProbe and restart of the pod.
			if process.IsRespawnIfRequired(err) {
				return
			}

		// 监听informer事件回调函数生成的Event
		case event := <-n.updateCh.Out():
			if n.isShuttingDown {
				break
			}

			if evt, ok := event.(store.Event); ok {
				klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
				if evt.Type == store.ConfigurationEvent {
					// TODO: is this necessary? Consider removing this special case
					n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
					continue
				}

				// 创建task
				n.syncQueue.EnqueueSkippableTask(evt.Obj)
			} else {
				klog.Warningf("Unexpected event type received %T", event)
			}
		case <-n.stopCh:
			return
		}
	}
}

如果熟悉Informer机制或阅读过其它控制器的源码话就很好理解Ingress控制器启动后要做的事情。

  1. List/Watch kubernetes相关资源对象,至于List/Watch了哪些资源下边分析Storer时揭晓。
  2. 启动Nginx进程(Nginx是由控制器启动的)
  3. 任务队列,用来处理List/Watch后的资源。具体怎么处理需要分析syncQueue,这里猜测与Nginx配置文件生成有关
  4. 因为是初次启动Nginx还没有配置文件需要手动触发一次配置更新操作。

Storer

“Store”负责缓存List/Watch的资源对象,因为Ingress控制器需要List/Watch Ingress、configmap、Services 等很多资源,这里把”store”相关的操作抽象成了一个接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type Storer interface {
	// GetBackendConfiguration returns the nginx configuration stored in a configmap
	GetBackendConfiguration() ngx_config.Configuration

	// GetConfigMap returns the ConfigMap matching key.
	GetConfigMap(key string) (*corev1.ConfigMap, error)

	// GetSecret returns the Secret matching key.
	GetSecret(key string) (*corev1.Secret, error)

	// GetService returns the Service matching key.
	GetService(key string) (*corev1.Service, error)

	// GetServiceEndpoints returns the Endpoints of a Service matching key.
	GetServiceEndpoints(key string) (*corev1.Endpoints, error)

	// ListIngresses returns a list of all Ingresses in the store.
	ListIngresses() []*ingress.Ingress

	// GetLocalSSLCert returns the local copy of a SSLCert
	GetLocalSSLCert(name string) (*ingress.SSLCert, error)

	// ListLocalSSLCerts returns the list of local SSLCerts
	ListLocalSSLCerts() []*ingress.SSLCert

	// GetAuthCertificate resolves a given secret name into an SSL certificate.
	// The secret must contain 3 keys named:
	//   ca.crt: contains the certificate chain used for authentication
	GetAuthCertificate(string) (*resolver.AuthSSLCert, error)

	// GetDefaultBackend returns the default backend configuration
	GetDefaultBackend() defaults.Backend

	// Run initiates the synchronization of the controllers
	Run(stopCh chan struct{})
}

k8sStore

接口实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type k8sStore struct {
	// backendConfig contains the running configuration from the configmap
	// this is required because this rarely changes but is a very expensive
	// operation to execute in each OnUpdate invocation
	backendConfig ngx_config.Configuration

	// informer contains the cache Informers
	informers *Informer

	// listers contains the cache.Store interfaces used in the ingress controller
	listers *Lister

	// sslStore local store of SSL certificates (certificates used in ingress)
	// this is required because the certificates must be present in the
	// container filesystem
	sslStore *SSLCertTracker

	annotations annotations.Extractor

	// secretIngressMap contains information about which ingress references a
	// secret in the annotations.
	secretIngressMap ObjectRefMap

	// updateCh
	updateCh *channels.RingChannel

	// syncSecretMu protects against simultaneous invocations of syncSecret
	syncSecretMu *sync.Mutex

	// backendConfigMu protects against simultaneous read/write of backendConfig
	backendConfigMu *sync.RWMutex

	defaultSSLCertificate string
}

New

初始化

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
func New(
	namespace string,
	namespaceSelector labels.Selector,
	configmap, tcp, udp, defaultSSLCertificate string,
	resyncPeriod time.Duration,
	client clientset.Interface,
	updateCh *channels.RingChannel,
	disableCatchAll bool,
	icConfig *ingressclass.IngressClassConfiguration) Storer {

	store := &k8sStore{
		informers:             &Informer{},
		listers:               &Lister{},
		sslStore:              NewSSLCertTracker(),
		updateCh:              updateCh,
		backendConfig:         ngx_config.NewDefault(),
		syncSecretMu:          &sync.Mutex{},
		backendConfigMu:       &sync.RWMutex{},
		secretIngressMap:      NewObjectRefMap(),
		defaultSSLCertificate: defaultSSLCertificate,
	}

	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
		Interface: client.CoreV1().Events(namespace),
	})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
		Component: "nginx-ingress-controller",
	})

	// k8sStore fulfills resolver.Resolver interface
	store.annotations = annotations.NewAnnotationExtractor(store)

	store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

	// As we currently do not filter out kubernetes objects we list, we can
	// retrieve a huge amount of data from the API server.
	// In a cluster using HELM < v3 configmaps are used to store binary data.
	// If you happen to have a lot of HELM releases in the cluster it will make
	// the memory consumption of nginx-ingress-controller explode.
	// In order to avoid that we filter out labels OWNER=TILLER.
	labelsTweakListOptionsFunc := func(options *metav1.ListOptions) {
		if len(options.LabelSelector) > 0 {
			options.LabelSelector += ",OWNER!=TILLER"
		} else {
			options.LabelSelector = "OWNER!=TILLER"
		}
	}

	// As of HELM >= v3 helm releases are stored using Secrets instead of ConfigMaps.
	// In order to avoid listing those secrets we discard type "helm.sh/release.v1"
	secretsTweakListOptionsFunc := func(options *metav1.ListOptions) {
		helmAntiSelector := fields.OneTermNotEqualSelector("type", "helm.sh/release.v1")
		baseSelector, err := fields.ParseSelector(options.FieldSelector)

		if err != nil {
			options.FieldSelector = helmAntiSelector.String()
		} else {
			options.FieldSelector = fields.AndSelectors(baseSelector, helmAntiSelector).String()
		}
	}

	// create informers factory, enable and assign required informers
	infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
		informers.WithNamespace(namespace),
	)

	// create informers factory for configmaps
	infFactoryConfigmaps := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
		informers.WithNamespace(namespace),
		informers.WithTweakListOptions(labelsTweakListOptionsFunc),
	)

	// create informers factory for secrets
	infFactorySecrets := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
		informers.WithNamespace(namespace),
		informers.WithTweakListOptions(secretsTweakListOptionsFunc),
	)

	// Ingress 资源
	store.informers.Ingress = infFactory.Networking().V1().Ingresses().Informer()
	store.listers.Ingress.Store = store.informers.Ingress.GetStore()

	// IngressClasses 允许运行多个ingress nginx 控制器
	if !icConfig.IgnoreIngressClass {
		store.informers.IngressClass = infFactory.Networking().V1().IngressClasses().Informer()
		store.listers.IngressClass.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
	}

	// Endpoint
	store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
	store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

	// Secret
	store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer()
	store.listers.Secret.Store = store.informers.Secret.GetStore()

	// ConfigMap
	store.informers.ConfigMap = infFactoryConfigmaps.Core().V1().ConfigMaps().Informer()
	store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

	// Services
	store.informers.Service = infFactory.Core().V1().Services().Informer()
	store.listers.Service.Store = store.informers.Service.GetStore()

	// avoid caching namespaces at cluster scope when watching single namespace
	if namespaceSelector != nil && !namespaceSelector.Empty() {
		// cache informers factory for namespaces
		infFactoryNamespaces := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
			informers.WithTweakListOptions(labelsTweakListOptionsFunc),
		)

		store.informers.Namespace = infFactoryNamespaces.Core().V1().Namespaces().Informer()
		store.listers.Namespace.Store = store.informers.Namespace.GetStore()
	}

	watchedNamespace := func(namespace string) bool {
		if namespaceSelector == nil || namespaceSelector.Empty() {
			return true
		}

		item, ok, err := store.listers.Namespace.GetByKey(namespace)
		if !ok {
			klog.Errorf("Namespace %s not existed: %v.", namespace, err)
			return false
		}
		ns, ok := item.(*corev1.Namespace)
		if !ok {
			return false
		}

		return namespaceSelector.Matches(labels.Set(ns.Labels))
	}

	ingDeleteHandler := func(obj interface{}) {
		ing, ok := toIngress(obj)
		if !ok {
			// If we reached here it means the ingress was deleted but its final state is unrecorded.
			tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
			if !ok {
				klog.ErrorS(nil, "Error obtaining object from tombstone", "key", obj)
				return
			}
			ing, ok = tombstone.Obj.(*networkingv1.Ingress)
			if !ok {
				klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
				return
			}
		}

		if !watchedNamespace(ing.Namespace) {
			return
		}

		_, err := store.GetIngressClass(ing, icConfig)
		if err != nil {
			klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
			return
		}

		if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
			klog.InfoS("Ignoring delete for catch-all because of --disable-catch-all", "ingress", klog.KObj(ing))
			return
		}

		store.listers.IngressWithAnnotation.Delete(ing)

		key := k8s.MetaNamespaceKey(ing)
		store.secretIngressMap.Delete(key)

		updateCh.In() <- Event{
			Type: DeleteEvent,
			Obj:  obj,
		}
	}

	ingEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			ing, _ := toIngress(obj)

			if !watchedNamespace(ing.Namespace) {
				return
			}

			ic, err := store.GetIngressClass(ing, icConfig)
			if err != nil {
				klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
				return
			}

			klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic)

			if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
				klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
				return
			}

			recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")

			store.syncIngress(ing)
			store.updateSecretIngressMap(ing)
			store.syncSecrets(ing)

			updateCh.In() <- Event{
				Type: CreateEvent,
				Obj:  obj,
			}
		},
		DeleteFunc: ingDeleteHandler,
		UpdateFunc: func(old, cur interface{}) {
			oldIng, _ := toIngress(old)
			curIng, _ := toIngress(cur)

			if !watchedNamespace(oldIng.Namespace) {
				return
			}

			var errOld, errCur error
			var classCur string
			if !icConfig.IgnoreIngressClass {
				_, errOld = store.GetIngressClass(oldIng, icConfig)
				classCur, errCur = store.GetIngressClass(curIng, icConfig)
			}
			if errOld != nil && errCur == nil {
				if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
					klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
					return
				}

				klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur)
				recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
			} else if errOld == nil && errCur != nil {
				klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng))
				ingDeleteHandler(old)
				return
			} else if errCur == nil && !reflect.DeepEqual(old, cur) {
				if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
					klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
					ingDeleteHandler(old)
					return
				}

				recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
			} else {
				klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
				return
			}

			store.syncIngress(curIng)
			store.updateSecretIngressMap(curIng)
			store.syncSecrets(curIng)

			updateCh.In() <- Event{
				Type: UpdateEvent,
				Obj:  cur,
			}
		},
	}

	ingressClassEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			ingressclass := obj.(*networkingv1.IngressClass)
			foundClassByName := false
			if icConfig.IngressClassByName && ingressclass.Name == icConfig.AnnotationValue {
				klog.InfoS("adding ingressclass as ingress-class-by-name is configured", "ingressclass", klog.KObj(ingressclass))
				foundClassByName = true
			}
			if !foundClassByName && ingressclass.Spec.Controller != icConfig.Controller {
				klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(ingressclass))
				return
			}
			err := store.listers.IngressClass.Add(ingressclass)
			if err != nil {
				klog.InfoS("error adding ingressclass to store", "ingressclass", klog.KObj(ingressclass), "error", err)
				return
			}

			updateCh.In() <- Event{
				Type: CreateEvent,
				Obj:  obj,
			}
		},
		DeleteFunc: func(obj interface{}) {
			ingressclass := obj.(*networkingv1.IngressClass)
			if ingressclass.Spec.Controller != icConfig.Controller {
				klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(ingressclass))
				return
			}
			err := store.listers.IngressClass.Delete(ingressclass)
			if err != nil {
				klog.InfoS("error removing ingressclass from store", "ingressclass", klog.KObj(ingressclass), "error", err)
				return
			}
			updateCh.In() <- Event{
				Type: DeleteEvent,
				Obj:  obj,
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			oic := old.(*networkingv1.IngressClass)
			cic := cur.(*networkingv1.IngressClass)
			if cic.Spec.Controller != icConfig.Controller {
				klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(cic))
				return
			}
			// TODO: In a future we might be interested in parse parameters and use as
			// current IngressClass for this case, crossing with configmap
			if !reflect.DeepEqual(cic.Spec.Parameters, oic.Spec.Parameters) {
				err := store.listers.IngressClass.Update(cic)
				if err != nil {
					klog.InfoS("error updating ingressclass in store", "ingressclass", klog.KObj(cic), "error", err)
					return
				}
				updateCh.In() <- Event{
					Type: UpdateEvent,
					Obj:  cur,
				}
			}
		},
	}

	secrEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			sec := obj.(*corev1.Secret)
			key := k8s.MetaNamespaceKey(sec)

			if store.defaultSSLCertificate == key {
				store.syncSecret(store.defaultSSLCertificate)
			}

			// find references in ingresses and update local ssl certs
			if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
				klog.InfoS("Secret was added and it is used in ingress annotations. Parsing", "secret", key)
				for _, ingKey := range ings {
					ing, err := store.getIngress(ingKey)
					if err != nil {
						klog.Errorf("could not find Ingress %v in local store", ingKey)
						continue
					}
					store.syncIngress(ing)
					store.syncSecrets(ing)
				}
				updateCh.In() <- Event{
					Type: CreateEvent,
					Obj:  obj,
				}
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			if !reflect.DeepEqual(old, cur) {
				sec := cur.(*corev1.Secret)
				key := k8s.MetaNamespaceKey(sec)

				if !watchedNamespace(sec.Namespace) {
					return
				}

				if store.defaultSSLCertificate == key {
					store.syncSecret(store.defaultSSLCertificate)
				}

				// find references in ingresses and update local ssl certs
				if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
					klog.InfoS("secret was updated and it is used in ingress annotations. Parsing", "secret", key)
					for _, ingKey := range ings {
						ing, err := store.getIngress(ingKey)
						if err != nil {
							klog.ErrorS(err, "could not find Ingress in local store", "ingress", ingKey)
							continue
						}
						store.syncSecrets(ing)
						store.syncIngress(ing)
					}
					updateCh.In() <- Event{
						Type: UpdateEvent,
						Obj:  cur,
					}
				}
			}
		},
		DeleteFunc: func(obj interface{}) {
			sec, ok := obj.(*corev1.Secret)
			if !ok {
				// If we reached here it means the secret was deleted but its final state is unrecorded.
				tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
				if !ok {
					return
				}

				sec, ok = tombstone.Obj.(*corev1.Secret)
				if !ok {
					return
				}
			}

			if !watchedNamespace(sec.Namespace) {
				return
			}

			store.sslStore.Delete(k8s.MetaNamespaceKey(sec))

			key := k8s.MetaNamespaceKey(sec)

			// find references in ingresses
			if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
				klog.InfoS("secret was deleted and it is used in ingress annotations. Parsing", "secret", key)
				for _, ingKey := range ings {
					ing, err := store.getIngress(ingKey)
					if err != nil {
						klog.Errorf("could not find Ingress %v in local store", ingKey)
						continue
					}
					store.syncIngress(ing)
				}

				updateCh.In() <- Event{
					Type: DeleteEvent,
					Obj:  obj,
				}
			}
		},
	}

	epEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			updateCh.In() <- Event{
				Type: CreateEvent,
				Obj:  obj,
			}
		},
		DeleteFunc: func(obj interface{}) {
			updateCh.In() <- Event{
				Type: DeleteEvent,
				Obj:  obj,
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			oep := old.(*corev1.Endpoints)
			cep := cur.(*corev1.Endpoints)
			if !reflect.DeepEqual(cep.Subsets, oep.Subsets) {
				updateCh.In() <- Event{
					Type: UpdateEvent,
					Obj:  cur,
				}
			}
		},
	}

	// TODO: add e2e test to verify that changes to one or more configmap trigger an update
	changeTriggerUpdate := func(name string) bool {
		return name == configmap || name == tcp || name == udp
	}

	handleCfgMapEvent := func(key string, cfgMap *corev1.ConfigMap, eventName string) {
		// updates to configuration configmaps can trigger an update
		triggerUpdate := false
		if changeTriggerUpdate(key) {
			triggerUpdate = true
			recorder.Eventf(cfgMap, corev1.EventTypeNormal, eventName, fmt.Sprintf("ConfigMap %v", key))
			if key == configmap {
				store.setConfig(cfgMap)
			}
		}

		ings := store.listers.IngressWithAnnotation.List()
		for _, ingKey := range ings {
			key := k8s.MetaNamespaceKey(ingKey)
			ing, err := store.getIngress(key)
			if err != nil {
				klog.Errorf("could not find Ingress %v in local store: %v", key, err)
				continue
			}

			if parser.AnnotationsReferencesConfigmap(ing) {
				store.syncIngress(ing)
				continue
			}

			if triggerUpdate {
				store.syncIngress(ing)
			}
		}

		if triggerUpdate {
			updateCh.In() <- Event{
				Type: ConfigurationEvent,
				Obj:  cfgMap,
			}
		}
	}

	cmEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			cfgMap := obj.(*corev1.ConfigMap)
			key := k8s.MetaNamespaceKey(cfgMap)
			handleCfgMapEvent(key, cfgMap, "CREATE")
		},
		UpdateFunc: func(old, cur interface{}) {
			if reflect.DeepEqual(old, cur) {
				return
			}

			cfgMap := cur.(*corev1.ConfigMap)
			key := k8s.MetaNamespaceKey(cfgMap)
			handleCfgMapEvent(key, cfgMap, "UPDATE")
		},
	}

	serviceHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			svc := obj.(*corev1.Service)
			if svc.Spec.Type == corev1.ServiceTypeExternalName {
				updateCh.In() <- Event{
					Type: CreateEvent,
					Obj:  obj,
				}
			}
		},
		DeleteFunc: func(obj interface{}) {
			svc := obj.(*corev1.Service)
			if svc.Spec.Type == corev1.ServiceTypeExternalName {
				updateCh.In() <- Event{
					Type: DeleteEvent,
					Obj:  obj,
				}
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			oldSvc := old.(*corev1.Service)
			curSvc := cur.(*corev1.Service)

			if reflect.DeepEqual(oldSvc, curSvc) {
				return
			}

			updateCh.In() <- Event{
				Type: UpdateEvent,
				Obj:  cur,
			}
		},
	}

	store.informers.Ingress.AddEventHandler(ingEventHandler)
	if !icConfig.IgnoreIngressClass {
		store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
	}
	store.informers.Endpoint.AddEventHandler(epEventHandler)
	store.informers.Secret.AddEventHandler(secrEventHandler)
	store.informers.ConfigMap.AddEventHandler(cmEventHandler)
	store.informers.Service.AddEventHandler(serviceHandler)

	// do not wait for informers to read the configmap configuration
	ns, name, _ := k8s.ParseNameNS(configmap)
	cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})
	if err != nil {
		klog.Warningf("Unexpected error reading configuration configmap: %v", err)
	}

	store.setConfig(cm)
	return store
}

从函数中我们可以获取两个重要信息:

  • 控制器监听了Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service 六种资源。
  • 调用syncIngress函数处理监听到资源
  • 监听到资源变化后生成Event并通过updateCh通道发送出去。

来看下syncIngress函数

k8sStore.syncIngress

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (s *k8sStore) syncIngress(ing *networkingv1.Ingress) {
	key := k8s.MetaNamespaceKey(ing)
	klog.V(3).Infof("updating annotations information for ingress %v", key)

	copyIng := &networkingv1.Ingress{}
	ing.ObjectMeta.DeepCopyInto(&copyIng.ObjectMeta)

	if s.backendConfig.AnnotationValueWordBlocklist != "" {
		if err := checkBadAnnotationValue(copyIng.Annotations, s.backendConfig.AnnotationValueWordBlocklist); err != nil {
			klog.Warningf("skipping ingress %s: %s", key, err)
			return
		}
	}

	ing.Spec.DeepCopyInto(&copyIng.Spec)
	ing.Status.DeepCopyInto(&copyIng.Status)

	for ri, rule := range copyIng.Spec.Rules {
		if rule.HTTP == nil {
			continue
		}

		for pi, path := range rule.HTTP.Paths {
			if path.Path == "" {
				copyIng.Spec.Rules[ri].HTTP.Paths[pi].Path = "/"
			}
		}
	}

	k8s.SetDefaultNGINXPathType(copyIng)

	err := s.listers.IngressWithAnnotation.Update(&ingress.Ingress{
		Ingress:           *copyIng,
		ParsedAnnotations: s.annotations.Extract(ing),
	})
	if err != nil {
		klog.Error(err)
	}
}

函数主要作用是提取Ingress中的Annotations,Nginx的很多配置是通过注解完成的。

k8sStore.Run

k8sStore Run方法

1
2
3
4
func (s *k8sStore) Run(stopCh chan struct{}) {
	// start informers
	s.informers.Run(stopCh) //s.informers=&Informer{}
}

调用informer.Run方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (i *Informer) Run(stopCh chan struct{}) {
	go i.Secret.Run(stopCh)
	go i.Endpoint.Run(stopCh)
	if i.IngressClass != nil {
		go i.IngressClass.Run(stopCh)
	}
	go i.Service.Run(stopCh)
	go i.ConfigMap.Run(stopCh)

	// wait for all involved caches to be synced before processing items
	// from the queue
	if !cache.WaitForCacheSync(stopCh,
		i.Endpoint.HasSynced,
		i.Service.HasSynced,
		i.Secret.HasSynced,
		i.ConfigMap.HasSynced,
	) {
		runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
	}
	if i.IngressClass != nil && !cache.WaitForCacheSync(stopCh, i.IngressClass.HasSynced) {
		runtime.HandleError(fmt.Errorf("timed out waiting for ingress classcaches to sync"))
	}

	// when limit controller scope to one namespace, skip sync namespaces at cluster scope
	if i.Namespace != nil {
		go i.Namespace.Run(stopCh)

		if !cache.WaitForCacheSync(stopCh, i.Namespace.HasSynced) {
			runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
		}
	}

	// in big clusters, deltas can keep arriving even after HasSynced
	// functions have returned 'true'
	time.Sleep(1 * time.Second)

	// we can start syncing ingress objects only after other caches are
	// ready, because ingress rules require content from other listers, and
	// 'add' events get triggered in the handlers during caches population.
	go i.Ingress.Run(stopCh)
	if !cache.WaitForCacheSync(stopCh,
		i.Ingress.HasSynced,
	) {
		runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
	}
}

这段代码已经见过很多次了,标准模板,如果要自己开发Controller的话照着写就行了,启动多个协程开始List/Watch对象。

syncQueue

NGINXController.syncQueue是在NGINXController初始化时赋值的,是一个任务队列

1
n.syncQueue = task.NewTaskQueue(n.syncIngress)

task包代码不多,同大多数任务队列一样,它需要对外提供一个添加任务的方法,这里它提供了两个方法,区别就是是否支持跳过任务。

1
2
3
4
5
6
7
8
9
func (t *Queue) EnqueueTask(obj interface{}) {
	t.enqueue(obj, false)
}

// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
	t.enqueue(obj, true)
}

同时还需要一个worker协程从队列中不断读取任务并处理它。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
	wait.Until(t.worker, period, stopCh)
}

func (t *Queue) worker() {
	for {
		key, quit := t.queue.Get()
		if quit {
			if !isClosed(t.workerDone) {
				close(t.workerDone)
			}
			return
		}
		ts := time.Now().UnixNano()

		item := key.(Element)
		if item.Timestamp != 0 && t.lastSync > item.Timestamp {
			klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
			t.queue.Forget(key)
			t.queue.Done(key)
			continue
		}

		klog.V(3).InfoS("syncing", "key", item.Key)
		if err := t.sync(key); err != nil { // 调用sync函数处理它
			klog.ErrorS(err, "requeuing", "key", item.Key)
			t.queue.AddRateLimited(Element{
				Key:       item.Key,
				Timestamp: 0,
			})
		} else {
			t.queue.Forget(key)
			t.lastSync = ts
		}

		t.queue.Done(key)
	}
}

取出任务后怎么处理它呢?

调用 sync函数处理它。

sync是什么呢,我们继续往下看。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func NewTaskQueue(syncFn func(interface{}) error) *Queue {
	return NewCustomTaskQueue(syncFn, nil)
}
func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {
	q := &Queue{
		queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		sync:       syncFn,
		workerDone: make(chan bool),
		fn:         fn,
	}

	if fn == nil {
		q.fn = q.defaultKeyFunc
	}

	return q
}

sync方法是Queue初始化时传入的。

Queue又是什么时候初始化的呢?

Queue就是上边NGINXController初始化时初始化的

1
n.syncQueue = task.NewTaskQueue(n.syncIngress)

将n.syncIngress传给了Queue.sync

继续来看下n.syncIngress

###syncIngress

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
	n.syncRateLimiter.Accept()

	if n.syncQueue.IsShuttingDown() {
		return nil
	}

	// 1. 从缓存中获取Ingresses
	ings := n.store.ListIngresses()
	// 2. 数据解析
	hosts, servers, pcfg := n.getConfiguration(ings)

	n.metricCollector.SetSSLExpireTime(servers)

	// 3.同当前运行的配置作比对判断配置是否有改变
	if n.runningConfig.Equal(pcfg) {
		klog.V(3).Infof("No configuration changne detected, skipping backend reload")
		return nil
	}

	n.metricCollector.SetHosts(hosts)

	// 4. 不能通过lua动态更新,需要生成新的nginx.conf并reload
	if !n.IsDynamicConfigurationEnough(pcfg) {
		klog.InfoS("Configuration changes detected, backend reload required")

		hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
			TagName: "json",
		})

		pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)

		err := n.OnUpdate(*pcfg) // 生成新的配置文件并reload
		if err != nil {
			n.metricCollector.IncReloadErrorCount()
			n.metricCollector.ConfigSuccess(hash, false)
			klog.Errorf("Unexpected failure reloading the backend:\n%v", err)
			n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
			return err
		}

		klog.InfoS("Backend successfully reloaded")
		n.metricCollector.ConfigSuccess(hash, true)
		n.metricCollector.IncReloadCount()

		n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
	}

	isFirstSync := n.runningConfig.Equal(&ingress.Configuration{}) // 是否为第一次运行
	if isFirstSync {
		// For the initial sync it always takes some time for NGINX to start listening
		// For large configurations it might take a while so we loop and back off
		klog.InfoS("Initial sync, sleeping for 1 second")
		time.Sleep(1 * time.Second)
	}

	// 5. 通过lua动态更新配置
	retry := wait.Backoff{
		Steps:    15,
		Duration: 1 * time.Second,
		Factor:   0.8,
		Jitter:   0.1,
	}

	err := wait.ExponentialBackoff(retry, func() (bool, error) {
		err := n.configureDynamically(pcfg) // 调用lua接口动态更新
		if err == nil {
			klog.V(2).Infof("Dynamic reconfiguration succeeded.")
			return true, nil
		}

		klog.Warningf("Dynamic reconfiguration failed: %v", err)
		return false, err
	})
	if err != nil {
		klog.Errorf("Unexpected failure reconfiguring NGINX:\n%v", err)
		return err
	}

	ri := getRemovedIngresses(n.runningConfig, pcfg)
	re := getRemovedHosts(n.runningConfig, pcfg)
	n.metricCollector.RemoveMetrics(ri, re)

	n.runningConfig = pcfg  // 6.置为当前运行的配置供下次执行时做配置比对

	return nil
}

剥了这么多层,谜底已快浮出水面。

  1. 从缓存中获取Ingresses
  2. 数据解析
  3. 同当前运行的配置作比对判断配置是否有改变
  4. 不能通过lua动态更新,需要生成新的nginx.conf并reload
  5. 通过lua动态更新配置
  6. 置为当前运行的配置供下次执行时做配置比对

OnUpdate

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
	cfg := n.store.GetBackendConfiguration()
	cfg.Resolver = n.resolver

	// 根据模板渲染出nginx.conf配置文件内容
	content, err := n.generateTemplate(cfg, ingressCfg)
	if err != nil {
		return err
	}
  
	// opentracing 相关
	err = createOpentracingCfg(cfg)
	if err != nil {
		return err
	}
  
	// nginx -t 测试配置文件
	err = n.testTemplate(content)
	if err != nil {
		return err
	}

	if klog.V(2).Enabled() {
	  // ... 调试用
	}

	// 生成 /etc/nginx/nginx.conf
	err = os.WriteFile(cfgPath, content, file.ReadWriteByUser)
	if err != nil {
		return err
	}

	// nginx -s reload
	o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
	if err != nil {
		return fmt.Errorf("%v\n%v", err, string(o))
	}

	return nil
}

生成nginx.conf配置文件

configureDynamically

动态更新

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
	backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
	if backendsChanged {
		err := configureBackends(pcfg.Backends)
		if err != nil {
			return err
		}
	}

	streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
	if streamConfigurationChanged {
		err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
		if err != nil {
			return err
		}
	}

	serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
	if serversChanged {
		err := configureCertificates(pcfg.Servers)
		if err != nil {
			return err
		}
	}

	return nil
}

func configureBackends(rawBackends []*ingress.Backend) error {
	backends := make([]*ingress.Backend, len(rawBackends))

	for i, backend := range rawBackends {
		var service *apiv1.Service
		if backend.Service != nil {
			service = &apiv1.Service{Spec: backend.Service.Spec}
		}
		luaBackend := &ingress.Backend{
			Name:                 backend.Name,
			Port:                 backend.Port,
			SSLPassthrough:       backend.SSLPassthrough,
			SessionAffinity:      backend.SessionAffinity,
			UpstreamHashBy:       backend.UpstreamHashBy,
			LoadBalancing:        backend.LoadBalancing,
			Service:              service,
			NoServer:             backend.NoServer,
			TrafficShapingPolicy: backend.TrafficShapingPolicy,
			AlternativeBackends:  backend.AlternativeBackends,
		}

		var endpoints []ingress.Endpoint
		for _, endpoint := range backend.Endpoints {
			endpoints = append(endpoints, ingress.Endpoint{
				Address: endpoint.Address,
				Port:    endpoint.Port,
			})
		}

		luaBackend.Endpoints = endpoints
		backends[i] = luaBackend
	}

	statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
	if err != nil {
		return err
	}

	if statusCode != http.StatusCreated {
		return fmt.Errorf("unexpected error code: %d", statusCode)
	}

	return nil
}

这里插个背景知识:

默认Nginx Ingress controller 在nginx upstream指令块中使用endpoints,当然也是可以使用注解nginx.ingress.kubernetes.io/service-upstream让它使用service的Cluster IP

By default the NGINX ingress controller uses a list of all endpoints (Pod IP/port) in the NGINX upstream configuration.

The nginx.ingress.kubernetes.io/service-upstream annotation disables that behavior and instead uses a single upstream in NGINX, the service’s Cluster IP and port.

This can be desirable for things like zero-downtime deployments . See issue #257.

关于选择用哪种方式确实有过争论,详细可以看下这个issue #257 ,我起初对为什么不使用service的IP也有类似的疑问。

回到主题,为什么需要动态更新呢?

因为在一个kubernetes集群里service的endpoints可能会频率变动,如果都静态写进nginx.conf配置文件就意味着每次endpoints变动都要nginx -s reload 重新加载配置文件,在高并发下可能会导致某些请求时延变长,所以就需要借助lua-nginx-module使用lua来动态更新upstream上游节点。

Lua部分

看下lua代码给controller具体提供了哪些接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
-- https://github.com/kubernetes/ingress-nginx/blob/controller-v1.1.0/rootfs/etc/nginx/lua/configuration.lua
function _M.call()
  if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
    ngx.status = ngx.HTTP_BAD_REQUEST
    ngx.print("Only POST and GET requests are allowed!")
    return
  end

  if ngx.var.request_uri == "/configuration/servers" then
    handle_servers()
    return
  end

  -- 废弃,之前用来获取controller本身运行的pod的数量
  if ngx.var.request_uri == "/configuration/general" then
    handle_general()
    return
  end

  if ngx.var.uri == "/configuration/certs" then
    handle_certs()
    return
  end

  if ngx.var.request_uri == "/configuration/backends" then
    handle_backends()
    return
  end

  ngx.status = ngx.HTTP_NOT_FOUND
  ngx.print("Not found!")
end

setmetatable(_M, {__index = { handle_servers = handle_servers }})

return _M

共有四个接口,主要提供修改upstream上游节点和ssl证书操作的功能。

其中/configuration/general已经废弃,老版本用来获取controller本身运行的pod的数量,现在通过apiserver获取。

在结合这些接口的基础上官方还提供了dbg这一程序供调试使用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
bash-5.0$ /dbg
dbg is a tool for quickly inspecting the state of the nginx instance

Usage:
  dbg [command]

Available Commands:
  backends    Inspect the dynamically-loaded backends information
  certs       Inspect dynamic SSL certificates
  conf        Dump the contents of /etc/nginx/nginx.conf
  general     Output the general dynamic lua state
  help        Help about any command

Flags:
  -h, --help   help for dbg

它的源码在这里main.go

总结

  • 通过API Server获取Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service获取组装nginx配置文件所需要的数据,然后传给模板渲染出nginx.conf。
  • 使用lua动态更新upstream上游节点、证书,避免频繁的nginx -s reload造成服务抖动。

PS

Ingress自Kubernetes 1.1 引入,持续了四年的beta阶段后终于在1.19 GA,应该是众多特性中最长GA的,个中原因个人觉得主要是因为

The Ingress API has had a rough road getting to GA. It is an essential resource for many, and the changes that have been introduced help manage that complexity while keeping it relatively light-weight. However, even with the added flexibility that has been introduced it doesn’t cover a variety of complex use-cases.

Ingress位于集群边缘承担当南北向流量路由的角色,控制器本身的性能是重要的考量因素,因此采用了nginx这一成熟产品也是比较稳妥的方案,但由于Nginx开发比较早,它诞生的年代容器化并不像现在这么普及,并未考虑云原生下的使用场景。Nginx丰富的配置赋予了它强大的功能,可以灵活应对各种需求的同时也意味着这些配置很难一一抽像和标准化,这也是Ingress nginx controller不仅使用configmap修改nginx全局配置,且使用了大量的注解来修改每个server下的配置。又由于Kubernetes 对维护达到正式发布(GA)阶段的官方 API 的兼容性有着很强的承诺,所以对API的变动需要特别慎重。

Gateway API

正如上边所说Ingress虽然解决了从无到有的问题,但是它并不够好,很多参数都需要通过Annotations完成,导致配置特别臃肿且缺乏灵活,但由于已经有大量的用户基础,已成了事实上的标准,所以官方只能仓促封板,另起炉灶。

What’s Next for Ingress?

SIG Network has been working on a new API referred to as “Service APIs” that takes into account the lessons learned from the previous efforts of working on Ingress. These Service APIs are not intended to replace Ingress, but instead compliment it by providing several new resources that could enable more complex workflows.

新项目一开始被命名为“Service APIs”,2021年2月后改名为“Gateway API” ,新API借鉴了Istio了做了更高的抽象,详细介绍可以看下官方文档。

参考

https://opensource.googleblog.com/2020/09/kubernetes-ingress-goes-ga.html