Tunnel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (i *Initializer) Initialize() error {
	klog.Info("Setting up node network")

	i.initNodeLocalConfig()

  i.initializeIPSec()

	i.prepareHostNetwork()

	i.setupOVSBridge()

	// routeClient.Initialize() should be after i.setupOVSBridge() which
	// creates the host gateway interface.
	i.routeClient.Initialize(i.nodeConfig, wg.Done)

	// Install OpenFlow entries on OVS bridge.
	i.initOpenFlowPipeline()

}

OVSBridge

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// setupOVSBridge sets up the OVS bridge and create host gateway interface and tunnel port
func (i *Initializer) setupOVSBridge() error {
	i.ovsBridgeClient.Create()

	i.prepareOVSBridge()

	// Initialize interface cache
	i.initInterfaceStore()

	i.setupDefaultTunnelInterface()
  
	// Set up host gateway interface
	i.setupGatewayInterface()

	return nil
}

创建 Tunnel Interface

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (i *Initializer) setupDefaultTunnelInterface() error {
	tunnelPortName := i.nodeConfig.DefaultTunName
	tunnelIface, portExists := i.ifaceStore.GetInterface(tunnelPortName)
	localIP := i.getTunnelPortLocalIP()
	localIPStr := ""
	if localIP != nil {
		localIPStr = localIP.String()
	}

	// Enabling UDP checksum can greatly improve the performance for Geneve and
	// VXLAN tunnels by triggering GRO on the receiver.
	shouldEnableCsum := i.networkConfig.TunnelType == ovsconfig.GeneveTunnel || i.networkConfig.TunnelType == ovsconfig.VXLANTunnel

	// Check the default tunnel port.
	if portExists {
		if i.networkConfig.TrafficEncapMode.SupportsEncap() &&
			tunnelIface.TunnelInterfaceConfig.Type == i.networkConfig.TunnelType &&
			tunnelIface.TunnelInterfaceConfig.LocalIP.Equal(localIP) {
			klog.V(2).Infof("Tunnel port %s already exists on OVS bridge", tunnelPortName)
			// This could happen when upgrading from previous versions that didn't set it.
			if shouldEnableCsum && !tunnelIface.TunnelInterfaceConfig.Csum {
				if err := i.enableTunnelCsum(tunnelPortName); err != nil {
					return fmt.Errorf("failed to enable csum for tunnel port %s: %v", tunnelPortName, err)
				}
				tunnelIface.TunnelInterfaceConfig.Csum = true
			}
			return nil
		}

		if err := i.ovsBridgeClient.DeletePort(tunnelIface.PortUUID); err != nil {
			if i.networkConfig.TrafficEncapMode.SupportsEncap() {
				return fmt.Errorf("failed to remove tunnel port %s with wrong tunnel type: %s", tunnelPortName, err)
			} else {
				klog.Errorf("Failed to remove tunnel port %s in NoEncapMode: %v", tunnelPortName, err)
			}
		} else {
			klog.Infof("Removed tunnel port %s with tunnel type: %s", tunnelPortName, tunnelIface.TunnelInterfaceConfig.Type)
			i.ifaceStore.DeleteInterface(tunnelIface)
		}
	}

	// Create the default tunnel port and interface.
	if i.networkConfig.TrafficEncapMode.SupportsEncap() {
		if tunnelPortName != defaultTunInterfaceName {
			// Reset the tunnel interface name to the desired name before
			// recreating the tunnel port and interface.
			tunnelPortName = defaultTunInterfaceName
			i.nodeConfig.DefaultTunName = tunnelPortName
		}
		tunnelPortUUID, err := i.ovsBridgeClient.CreateTunnelPortExt(tunnelPortName, i.networkConfig.TunnelType, config.DefaultTunOFPort, shouldEnableCsum, localIPStr, "", "", nil)
		if err != nil {
			klog.Errorf("Failed to create tunnel port %s type %s on OVS bridge: %v", tunnelPortName, i.networkConfig.TunnelType, err)
			return err
		}
		tunnelIface = interfacestore.NewTunnelInterface(tunnelPortName, i.networkConfig.TunnelType, localIP, shouldEnableCsum)
		tunnelIface.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: tunnelPortUUID, OFPort: config.DefaultTunOFPort}
		i.ifaceStore.AddInterface(tunnelIface)
	}
	return nil
}

FlowPipeline

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// initOpenFlowPipeline sets up necessary Openflow entries, including pipeline, classifiers, conn_track, and gateway flows
// Every time the agent is (re)started, we go through the following sequence:
//   1. agent determines the new round number (this is done by incrementing the round number
//   persisted in OVSDB, or if it's not available by picking round 1).
//   2. any existing flow for which the round number matches the round number obtained from step 1
//   is deleted.
//   3. all required flows are installed, using the round number obtained from step 1.
//   4. after convergence, all existing flows for which the round number matches the previous round
//   number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
//   5. the new round number obtained from step 1 is persisted to OVSDB.
// The rationale for not persisting the new round number until after all previous flows have been
// deleted is to avoid a situation in which some stale flows are never deleted because of successive
// agent restarts (with the agent crashing before step 4 can be completed). With the sequence
// described above, We guarantee that at most two rounds of flows exist in the switch at any given
// time.
// Note that at the moment we assume that all OpenFlow groups are deleted every time there is an
// Antrea Agent restart. This allows us to add the necessary groups without having to worry about
// the operation failing because a (stale) group with the same ID already exists in OVS. This
// assumption is currently guaranteed by the ofnet implementation:
// https://github.com/wenyingd/ofnet/blob/14a78b27ef8762e45a0cfc858c4d07a4572a99d5/ofctrl/fgraphSwitch.go#L57-L62
// All previous groups have been deleted by the time the call to i.ofClient.Initialize returns.
func (i *Initializer) initOpenFlowPipeline() error {
	roundInfo := getRoundInfo(i.ovsBridgeClient)

	// Set up all basic flows.
	ofConnCh, err := i.ofClient.Initialize(roundInfo, i.nodeConfig, i.networkConfig.TrafficEncapMode)
	if err != nil {
		klog.Errorf("Failed to initialize openflow client: %v", err)
		return err
	}

	// On Windows platform, host network flows are needed for host traffic.
	if err := i.initHostNetworkFlows(); err != nil {
		klog.Errorf("Failed to install openflow entries for host network: %v", err)
		return err
	}

	// Install OpenFlow entries to enable Pod traffic to external IP
	// addresses.
	if err := i.ofClient.InstallExternalFlows(); err != nil {
		klog.Errorf("Failed to install openflow entries for external connectivity: %v", err)
		return err
	}

	// Set up flow entries for gateway interface, including classifier, skip spoof guard check,
	// L3 forwarding and L2 forwarding
	if err := i.ofClient.InstallGatewayFlows(); err != nil {
		klog.Errorf("Failed to setup openflow entries for gateway: %v", err)
		return err
	}

	if i.networkConfig.TrafficEncapMode.SupportsEncap() {
		// Set up flow entries for the default tunnel port interface.
		if err := i.ofClient.InstallDefaultTunnelFlows(); err != nil {
			klog.Errorf("Failed to setup openflow entries for tunnel interface: %v", err)
			return err
		}
	}

	if !i.enableProxy {
		// Set up flow entries to enable Service connectivity. Upstream kube-proxy is leveraged to
		// provide load-balancing, and the flows installed by this method ensure that traffic sent
		// from local Pods to any Service address can be forwarded to the host gateway interface
		// correctly. Otherwise packets might be dropped by egress rules before they are DNATed to
		// backend Pods.
		if err := i.ofClient.InstallClusterServiceCIDRFlows([]*net.IPNet{i.serviceCIDR, i.serviceCIDRv6}); err != nil {
			klog.Errorf("Failed to setup OpenFlow entries for Service CIDRs: %v", err)
			return err
		}
	} else {
		// Set up flow entries to enable Service connectivity. The agent proxy handles
		// ClusterIP Services while the upstream kube-proxy is leveraged to handle
		// any other kinds of Services.
		if err := i.ofClient.InstallClusterServiceFlows(); err != nil {
			klog.Errorf("Failed to setup default OpenFlow entries for ClusterIP Services: %v", err)
			return err
		}
	}

	go func() {
		// Delete stale flows from previous round. We need to wait long enough to ensure
		// that all the flow which are still required have received an updated cookie (with
		// the new round number), otherwise we would disrupt the dataplane. Unfortunately,
		// the time required for convergence may be large and there is no simple way to
		// determine when is a right time to perform the cleanup task.
		// TODO: introduce a deterministic mechanism through which the different entities
		//  responsible for installing flows can notify the agent that this deletion
		//  operation can take place. A waitGroup can be created here and notified when
		//  full sync in agent networkpolicy controller is complete. This would signal NP
		//  flows have been synced once. Other mechanisms are still needed for node flows
		//  fullSync check.
		time.Sleep(10 * time.Second)
		klog.Info("Deleting stale flows from previous round if any")
		if err := i.ofClient.DeleteStaleFlows(); err != nil {
			klog.Errorf("Error when deleting stale flows from previous round: %v", err)
			return
		}
		persistRoundNum(roundInfo.RoundNum, i.ovsBridgeClient, 1*time.Second, maxRetryForRoundNumSave)
	}()

	go func() {
		for {
			if _, ok := <-ofConnCh; !ok {
				return
			}
			klog.Info("Replaying OF flows to OVS bridge")
			i.ofClient.ReplayFlows()
			klog.Info("Flow replay completed")

			if i.ovsBridgeClient.GetOVSDatapathType() == ovsconfig.OVSDatapathNetdev {
				// we don't set flow-restore-wait when using the OVS netdev datapath
				return
			}

			// ofClient and ovsBridgeClient have their own mechanisms to restore connections with OVS, and it could
			// happen that ovsBridgeClient's connection is not ready when ofClient completes flow replay. We retry it
			// with a timeout that is longer time than ovsBridgeClient's maximum connecting retry interval (8 seconds)
			// to ensure the flag can be removed successfully.
			err := wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
				if err := i.FlowRestoreComplete(); err != nil {
					return false, nil
				}
				return true, nil
			})
			// This shouldn't happen unless OVS is disconnected again after replaying flows. If it happens, we will try
			// to clean up the config again so an error log should be fine.
			if err != nil {
				klog.Errorf("Failed to clean up flow-restore-wait config: %v", err)
			}
		}
	}()

	return nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (c *client) InstallDefaultTunnelFlows() error {
	flows := []binding.Flow{
		c.tunnelClassifierFlow(config.DefaultTunOFPort, cookie.Default),
		c.l2ForwardCalcFlow(globalVirtualMAC, config.DefaultTunOFPort, true, cookie.Default),
	}
	if err := c.ofEntryOperations.AddAll(flows); err != nil {
		return err
	}
	c.defaultTunnelFlows = flows
	return nil
}

CNI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
- CmdAdd
  |- ipam.ExecIPAMAdd
  |- updateResultIfaceConfig
  |- updateResultDNSConfig
  |- s.podConfigurator.configureInterfaces
     |- pc.ifConfigurator.configureContainerLink
        |- configureContainerLinkSriov
     |- pc.connectInterfaceToOVS
     |- pc.ifConfigurator.advertiseContainerAddr
  |- return resultToResponse(result), nil
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// configureContainerLinkSriov move VF to the container namesapce
func (ic *ifConfigurator) configureContainerLinkSriov(
	podName string,
	podNamespace string,
	containerID string,
	containerNetNS string,
	containerIfaceName string,
	mtu int,
	pciAddress string,
	result *current.Result,
) error {
	hostIfaceName := util.GenerateContainerInterfaceName(podName, podNamespace, containerID)

	hostIface := &current.Interface{Name: hostIfaceName}
	containerIface := &current.Interface{Name: containerIfaceName, Sandbox: containerNetNS}
	// 这里设置了 result 的 hostIface 为 VF Representor
  result.Interfaces = []*current.Interface{hostIface, containerIface}

	// 1. get VF netdevice from PCI
	vfNetdevices, err := sriovnet.GetNetDevicesFromPci(pciAddress)
	vfNetdevice := vfNetdevices[0]
  
	// 2. get Uplink netdevice
	uplink, err := sriovnet.GetUplinkRepresentor(pciAddress)
  
	// 3. get VF index from PCI
	vfIndex, err := sriovnet.GetVfIndexByPciAddress(pciAddress)

	// 4. lookup representor
	repPortName, err := sriovnet.GetVfRepresentor(uplink, vfIndex)

	// 5. rename VF representor to hostIfaceName
	renameLink(repPortName, hostIfaceName)
  
	hostIface.Name = hostIfaceName
	link, err := netlink.LinkByName(hostIface.Name)
	hostIface.Mac = link.Attrs().HardwareAddr.String()

  // 6. Move VF to Container namespace
	netns, err := ns.GetNS(containerNetNS)
	err = moveIfToNetns(vfNetdevice, netns)
	netns.Close()
  
	if err := ns.WithNetNSPath(containerNetNS, func(hostNS ns.NetNS) error {
		err = renameLink(vfNetdevice, containerIfaceName)
		link, err = netlink.LinkByName(containerIfaceName)
		err = netlink.LinkSetMTU(link, mtu)
		err = netlink.LinkSetUp(link)

		klog.V(2).Infof("Setup interfaces host: %s, container %s", repPortName, containerIfaceName)
		containerIface.Name = containerIfaceName
		containerIface.Mac = link.Attrs().HardwareAddr.String()
		containerIface.Sandbox = netns.Path()
		klog.V(2).Infof("Configuring IP address for container %s", containerID)
	
    // result.Interfaces must be set before this.
		ipam.ConfigureIface(containerIface.Name, result)
		return nil
	}); err != nil {
		return err
	}
	return nil
}

这里连接到 OVS 的 hostIface 即为 VF Representor,也即是在 OVS 的 br-int 上的 ovsPortName

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// connectInterfaceToOVS connects an existing interface to ovs br-int.
func (pc *podConfigurator) connectInterfaceToOVS(
	podName string,
	podNameSpace string,
	containerID string,
	hostIface *current.Interface,
	containerIface *current.Interface,
	ips []*current.IPConfig,
	containerAccess *containerAccessArbitrator,
) (*interfacestore.InterfaceConfig, error) {
	// Use the outer veth interface name as the OVS port name.
	ovsPortName := hostIface.Name
	containerConfig := buildContainerConfig(ovsPortName, containerID, podName, podNameSpace, containerIface, ips)
	return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig)
}

继续看这个 hostIface 是如何挂到 OVS 的 bridge 上的,可以看见每次创建一个 Pod 都会同时创建一个 OVSPort,然后挂载上去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, containerConfig *interfacestore.InterfaceConfig) error {
	// create OVS Port and add attach container configuration into external_ids
	containerID := containerConfig.ContainerID
	klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID)
	ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig)
	portUUID, err := pc.createOVSPort(ovsPortName, ovsAttachInfo)

	// GetOFPort will wait for up to 1 second for OVSDB to report the OFPort number.
	ofPort, err := pc.ovsBridgeClient.GetOFPort(ovsPortName)

	klog.V(2).Infof("Setting up Openflow entries for container %s", containerID)
	err = pc.ofClient.InstallPodFlows(ovsPortName, containerConfig.IPs, containerConfig.MAC, uint32(ofPort))

	containerConfig.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: portUUID, OFPort: ofPort}
	// Add containerConfig into local cache
	pc.ifaceStore.AddInterface(containerConfig)
	// Notify the Pod update event to required components.
	pc.entityUpdates <- types.EntityReference{
		Pod: &v1beta2.PodReference{Name: containerConfig.PodName, Namespace: containerConfig.PodNamespace},
	}
	return nil
}

同时这里还通过 InstallPodFlows 函数设置了流表规则,我们看看具体怎么操作的,这里的 interfaceName 即是 VF Representor 的名字:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (c *client) InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP, podInterfaceMAC net.HardwareAddr, ofPort uint32) error {
	c.replayMutex.RLock()
	defer c.replayMutex.RUnlock()

	localGatewayMAC := c.nodeConfig.GatewayConfig.MAC
	flows := []binding.Flow{
		c.podClassifierFlow(ofPort, cookie.Pod),
		c.l2ForwardCalcFlow(podInterfaceMAC, ofPort, false, cookie.Pod),
	}

	// Add support for IPv4 ARP responder.
	podInterfaceIPv4 := util.GetIPv4Addr(podInterfaceIPs)
	if podInterfaceIPv4 != nil {
		flows = append(flows, c.arpSpoofGuardFlow(podInterfaceIPv4, podInterfaceMAC, ofPort, cookie.Pod))
	}
	// Add IP SpoofGuard flows for all validate IPs.
	flows = append(flows, c.podIPSpoofGuardFlow(podInterfaceIPs, podInterfaceMAC, ofPort, cookie.Pod)...)
	// Add L3 Routing flows to rewrite Pod's dst MAC for all validate IPs.
	flows = append(flows, c.l3FwdFlowToPod(localGatewayMAC, podInterfaceIPs, podInterfaceMAC, cookie.Pod)...)

	if c.encapMode.IsNetworkPolicyOnly() {
		// In policy-only mode, traffic to local Pod is routed based on destination IP.
		flows = append(flows,
			c.l3FwdFlowRouteToPod(podInterfaceIPs, podInterfaceMAC, cookie.Pod)...,
		)
	}
	return c.addFlows(c.podFlowCache, interfaceName, flows)
}

这里添加的几条规则是:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
// podClassifierFlow generates the flow to mark traffic comes from the podOFPort.
func (c *client) podClassifierFlow(podOFPort uint32, category cookie.Category) binding.Flow {
	classifierTable := c.pipeline[ClassifierTable]
	return classifierTable.BuildFlow(priorityLow).
		MatchInPort(podOFPort).
		Action().LoadRegRange(int(marksReg), markTrafficFromLocal, binding.Range{0, 15}).
		Action().GotoTable(classifierTable.GetNext()).
		Cookie(c.cookieAllocator.Request(category).Raw()).
		Done()
}

// l2ForwardCalcFlow generates the flow that matches dst MAC and loads ofPort to reg.
func (c *client) l2ForwardCalcFlow(dstMAC net.HardwareAddr, ofPort uint32, skipIngressRules bool, category cookie.Category) binding.Flow {
	l2FwdCalcTable := c.pipeline[l2ForwardingCalcTable]
	nextTable := l2FwdCalcTable.GetNext()
	if !skipIngressRules {
		// Go to ingress NetworkPolicy tables for traffic to local Pods.
		nextTable = c.ingressEntryTable
	}
	return l2FwdCalcTable.BuildFlow(priorityNormal).
		MatchDstMAC(dstMAC).
		Action().LoadRegRange(int(PortCacheReg), ofPort, ofPortRegRange).
		Action().LoadRegRange(int(marksReg), portFoundMark, ofPortMarkRange).
		Action().GotoTable(nextTable).
		Cookie(c.cookieAllocator.Request(category).Raw()).
		Done()
	// Broadcast, multicast, and unknown unicast packets will be dropped by
	// the default flow of L2ForwardingOutTable.
}

// arpSpoofGuardFlow generates the flow to check ARP traffic sent out from local pods interfaces.
func (c *client) arpSpoofGuardFlow(ifIP net.IP, ifMAC net.HardwareAddr, ifOFPort uint32, category cookie.Category) binding.Flow {
	return c.pipeline[spoofGuardTable].BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolARP).
		MatchInPort(ifOFPort).
		MatchARPSha(ifMAC).
		MatchARPSpa(ifIP).
		Action().GotoTable(arpResponderTable).
		Cookie(c.cookieAllocator.Request(category).Raw()).
		Done()
}

// podIPSpoofGuardFlow generates the flow to check IP traffic sent out from local pod. Traffic from host gateway interface
// will not be checked, since it might be pod to service traffic or host namespace traffic.
func (c *client) podIPSpoofGuardFlow(ifIPs []net.IP, ifMAC net.HardwareAddr, ifOFPort uint32, category cookie.Category) []binding.Flow {
	ipPipeline := c.pipeline
	ipSpoofGuardTable := ipPipeline[spoofGuardTable]
	var flows []binding.Flow
	for _, ifIP := range ifIPs {
		ipProtocol := getIPProtocol(ifIP)
		if ipProtocol == binding.ProtocolIP {
			flows = append(flows, ipSpoofGuardTable.BuildFlow(priorityNormal).MatchProtocol(ipProtocol).
				MatchInPort(ifOFPort).
				MatchSrcMAC(ifMAC).
				MatchSrcIP(ifIP).
				Action().GotoTable(ipSpoofGuardTable.GetNext()).
				Cookie(c.cookieAllocator.Request(category).Raw()).
				Done())
		} else if ipProtocol == binding.ProtocolIPv6 {
			flows = append(flows, ipSpoofGuardTable.BuildFlow(priorityNormal).MatchProtocol(ipProtocol).
				MatchInPort(ifOFPort).
				MatchSrcMAC(ifMAC).
				MatchSrcIP(ifIP).
				Action().GotoTable(ipv6Table).
				Cookie(c.cookieAllocator.Request(category).Raw()).
				Done())
		}
	}
	return flows
}

// l3FwdFlowToPod generates the L3 forward flows for traffic from tunnel to a
// local Pod. It rewrites the destination MAC (should be globalVirtualMAC) to
// the Pod interface MAC, and rewrites the source MAC to the gateway interface
// MAC.
func (c *client) l3FwdFlowToPod(localGatewayMAC net.HardwareAddr, podInterfaceIPs []net.IP, podInterfaceMAC net.HardwareAddr, category cookie.Category) []binding.Flow {
	l3FwdTable := c.pipeline[l3ForwardingTable]
	var flows []binding.Flow
	for _, ip := range podInterfaceIPs {
		ipProtocol := getIPProtocol(ip)
		flows = append(flows, l3FwdTable.BuildFlow(priorityNormal).MatchProtocol(ipProtocol).
			MatchRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
			MatchDstIP(ip).
			Action().SetSrcMAC(localGatewayMAC).
			// Rewrite src MAC to local gateway MAC, and rewrite dst MAC to pod MAC
			Action().SetDstMAC(podInterfaceMAC).
			Action().GotoTable(l3DecTTLTable).
			Cookie(c.cookieAllocator.Request(category).Raw()).
			Done())
	}
	return flows
}

// l3FwdFlowRouteToPod generates the flows to route the traffic to a Pod based on
// the destination IP. It rewrites the destination MAC of the packets to the Pod
// interface MAC. The flow is used in the networkPolicyOnly mode for the traffic
// from the gateway to a local Pod.
func (c *client) l3FwdFlowRouteToPod(podInterfaceIPs []net.IP, podInterfaceMAC net.HardwareAddr, category cookie.Category) []binding.Flow {
	l3FwdTable := c.pipeline[l3ForwardingTable]
	var flows []binding.Flow
	for _, ip := range podInterfaceIPs {
		ipProtocol := getIPProtocol(ip)
		flows = append(flows, l3FwdTable.BuildFlow(priorityNormal).MatchProtocol(ipProtocol).
			MatchDstIP(ip).
			Action().SetDstMAC(podInterfaceMAC).
			Action().GotoTable(l3DecTTLTable).
			Cookie(c.cookieAllocator.Request(category).Raw()).
			Done())
	}
	return flows
}

参考资料