1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
// initOpenFlowPipeline sets up necessary Openflow entries, including pipeline, classifiers, conn_track, and gateway flows
// Every time the agent is (re)started, we go through the following sequence:
// 1. agent determines the new round number (this is done by incrementing the round number
// persisted in OVSDB, or if it's not available by picking round 1).
// 2. any existing flow for which the round number matches the round number obtained from step 1
// is deleted.
// 3. all required flows are installed, using the round number obtained from step 1.
// 4. after convergence, all existing flows for which the round number matches the previous round
// number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
// 5. the new round number obtained from step 1 is persisted to OVSDB.
// The rationale for not persisting the new round number until after all previous flows have been
// deleted is to avoid a situation in which some stale flows are never deleted because of successive
// agent restarts (with the agent crashing before step 4 can be completed). With the sequence
// described above, We guarantee that at most two rounds of flows exist in the switch at any given
// time.
// Note that at the moment we assume that all OpenFlow groups are deleted every time there is an
// Antrea Agent restart. This allows us to add the necessary groups without having to worry about
// the operation failing because a (stale) group with the same ID already exists in OVS. This
// assumption is currently guaranteed by the ofnet implementation:
// https://github.com/wenyingd/ofnet/blob/14a78b27ef8762e45a0cfc858c4d07a4572a99d5/ofctrl/fgraphSwitch.go#L57-L62
// All previous groups have been deleted by the time the call to i.ofClient.Initialize returns.
func (i *Initializer) initOpenFlowPipeline() error {
roundInfo := getRoundInfo(i.ovsBridgeClient)
// Set up all basic flows.
ofConnCh, err := i.ofClient.Initialize(roundInfo, i.nodeConfig, i.networkConfig.TrafficEncapMode)
if err != nil {
klog.Errorf("Failed to initialize openflow client: %v", err)
return err
}
// On Windows platform, host network flows are needed for host traffic.
if err := i.initHostNetworkFlows(); err != nil {
klog.Errorf("Failed to install openflow entries for host network: %v", err)
return err
}
// Install OpenFlow entries to enable Pod traffic to external IP
// addresses.
if err := i.ofClient.InstallExternalFlows(); err != nil {
klog.Errorf("Failed to install openflow entries for external connectivity: %v", err)
return err
}
// Set up flow entries for gateway interface, including classifier, skip spoof guard check,
// L3 forwarding and L2 forwarding
if err := i.ofClient.InstallGatewayFlows(); err != nil {
klog.Errorf("Failed to setup openflow entries for gateway: %v", err)
return err
}
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
// Set up flow entries for the default tunnel port interface.
if err := i.ofClient.InstallDefaultTunnelFlows(); err != nil {
klog.Errorf("Failed to setup openflow entries for tunnel interface: %v", err)
return err
}
}
if !i.enableProxy {
// Set up flow entries to enable Service connectivity. Upstream kube-proxy is leveraged to
// provide load-balancing, and the flows installed by this method ensure that traffic sent
// from local Pods to any Service address can be forwarded to the host gateway interface
// correctly. Otherwise packets might be dropped by egress rules before they are DNATed to
// backend Pods.
if err := i.ofClient.InstallClusterServiceCIDRFlows([]*net.IPNet{i.serviceCIDR, i.serviceCIDRv6}); err != nil {
klog.Errorf("Failed to setup OpenFlow entries for Service CIDRs: %v", err)
return err
}
} else {
// Set up flow entries to enable Service connectivity. The agent proxy handles
// ClusterIP Services while the upstream kube-proxy is leveraged to handle
// any other kinds of Services.
if err := i.ofClient.InstallClusterServiceFlows(); err != nil {
klog.Errorf("Failed to setup default OpenFlow entries for ClusterIP Services: %v", err)
return err
}
}
go func() {
// Delete stale flows from previous round. We need to wait long enough to ensure
// that all the flow which are still required have received an updated cookie (with
// the new round number), otherwise we would disrupt the dataplane. Unfortunately,
// the time required for convergence may be large and there is no simple way to
// determine when is a right time to perform the cleanup task.
// TODO: introduce a deterministic mechanism through which the different entities
// responsible for installing flows can notify the agent that this deletion
// operation can take place. A waitGroup can be created here and notified when
// full sync in agent networkpolicy controller is complete. This would signal NP
// flows have been synced once. Other mechanisms are still needed for node flows
// fullSync check.
time.Sleep(10 * time.Second)
klog.Info("Deleting stale flows from previous round if any")
if err := i.ofClient.DeleteStaleFlows(); err != nil {
klog.Errorf("Error when deleting stale flows from previous round: %v", err)
return
}
persistRoundNum(roundInfo.RoundNum, i.ovsBridgeClient, 1*time.Second, maxRetryForRoundNumSave)
}()
go func() {
for {
if _, ok := <-ofConnCh; !ok {
return
}
klog.Info("Replaying OF flows to OVS bridge")
i.ofClient.ReplayFlows()
klog.Info("Flow replay completed")
if i.ovsBridgeClient.GetOVSDatapathType() == ovsconfig.OVSDatapathNetdev {
// we don't set flow-restore-wait when using the OVS netdev datapath
return
}
// ofClient and ovsBridgeClient have their own mechanisms to restore connections with OVS, and it could
// happen that ovsBridgeClient's connection is not ready when ofClient completes flow replay. We retry it
// with a timeout that is longer time than ovsBridgeClient's maximum connecting retry interval (8 seconds)
// to ensure the flag can be removed successfully.
err := wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if err := i.FlowRestoreComplete(); err != nil {
return false, nil
}
return true, nil
})
// This shouldn't happen unless OVS is disconnected again after replaying flows. If it happens, we will try
// to clean up the config again so an error log should be fine.
if err != nil {
klog.Errorf("Failed to clean up flow-restore-wait config: %v", err)
}
}
}()
return nil
}
|