Skip to content

Commit ed38c30

Browse files
authored
feat(abort): shifting chaosengine updation & event creation to experiment pod (#294)
Signed-off-by: shubhamchaudhary <[email protected]>
1 parent fafec1a commit ed38c30

2 files changed

Lines changed: 150 additions & 96 deletions

File tree

chaoslib/litmus/network-chaos/helper/netem.go

Lines changed: 112 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,26 @@ import (
1111
"syscall"
1212
"time"
1313

14-
"github.com/litmuschaos/litmus-go/chaoslib/litmus/network_latency/tc"
1514
clients "github.com/litmuschaos/litmus-go/pkg/clients"
1615
"github.com/litmuschaos/litmus-go/pkg/events"
1716
experimentEnv "github.com/litmuschaos/litmus-go/pkg/generic/network-chaos/environment"
1817
experimentTypes "github.com/litmuschaos/litmus-go/pkg/generic/network-chaos/types"
1918
"github.com/litmuschaos/litmus-go/pkg/log"
2019
"github.com/litmuschaos/litmus-go/pkg/result"
2120
"github.com/litmuschaos/litmus-go/pkg/types"
21+
"github.com/litmuschaos/litmus-go/pkg/utils/common"
2222
"github.com/pkg/errors"
2323
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2424
clientTypes "k8s.io/apimachinery/pkg/types"
2525
)
2626

27+
const (
28+
qdiscNotFound = "Cannot delete qdisc with handle of zero"
29+
qdiscNoFileFound = "RTNETLINK answers: No such file or directory"
30+
)
31+
2732
var err error
33+
var inject, abort chan os.Signal
2834

2935
func main() {
3036

@@ -34,6 +40,16 @@ func main() {
3440
chaosDetails := types.ChaosDetails{}
3541
resultDetails := types.ResultDetails{}
3642

43+
// inject channel is used to transmit signal notifications.
44+
inject = make(chan os.Signal, 1)
45+
// Catch and relay certain signal(s) to inject channel.
46+
signal.Notify(inject, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
47+
48+
// abort channel is used to transmit signal notifications.
49+
abort = make(chan os.Signal, 1)
50+
// Catch and relay certain signal(s) to abort channel.
51+
signal.Notify(abort, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
52+
3753
//Getting kubeConfig and Generate ClientSets
3854
if err := clients.GenerateClientSetFromKubeConfig(); err != nil {
3955
log.Fatalf("Unable to Get the kubeconfig, err: %v", err)
@@ -79,8 +95,8 @@ func PreparePodNetworkChaos(experimentsDetails *experimentTypes.ExperimentDetail
7995
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosEngine")
8096
}
8197

82-
var endTime <-chan time.Time
83-
timeDelay := time.Duration(experimentsDetails.ChaosDuration) * time.Second
98+
// watching for the abort signal and revert the chaos
99+
go abortWatcher(targetPID)
84100

85101
// injecting network chaos inside target container
86102
if err = InjectChaos(experimentsDetails, targetPID); err != nil {
@@ -89,46 +105,12 @@ func PreparePodNetworkChaos(experimentsDetails *experimentTypes.ExperimentDetail
89105

90106
log.Infof("[Chaos]: Waiting for %vs", experimentsDetails.ChaosDuration)
91107

92-
// signChan channel is used to transmit signal notifications.
93-
signChan := make(chan os.Signal, 1)
94-
// Catch and relay certain signal(s) to signChan channel.
95-
signal.Notify(signChan, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
96-
97-
loop:
98-
for {
99-
endTime = time.After(timeDelay)
100-
select {
101-
case <-signChan:
102-
log.Info("[Chaos]: Killing process started because of terminated signal received")
103-
if err = tc.Killnetem(targetPID); err != nil {
104-
log.Errorf("unable to kill netem process, err :%v", err)
105-
106-
}
107-
// updating the chaosresult after stopped
108-
failStep := "Network Chaos injection stopped!"
109-
types.SetResultAfterCompletion(resultDetails, "Stopped", "Stopped", failStep)
110-
result.ChaosResult(chaosDetails, clients, resultDetails, "EOT")
111-
112-
// generating summary event in chaosengine
113-
msg := experimentsDetails.ExperimentName + " experiment has been aborted"
114-
types.SetEngineEventAttributes(eventsDetails, types.Summary, msg, "Warning", chaosDetails)
115-
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosEngine")
116-
117-
// generating summary event in chaosresult
118-
types.SetResultEventAttributes(eventsDetails, types.StoppedVerdict, msg, "Warning", resultDetails)
119-
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosResult")
120-
os.Exit(1)
121-
case <-endTime:
122-
log.Infof("[Chaos]: Time is up for experiment: %v", experimentsDetails.ExperimentName)
123-
endTime = nil
124-
break loop
125-
}
126-
}
108+
common.WaitForDuration(experimentsDetails.ChaosDuration)
127109

128110
log.Info("[Chaos]: Stopping the experiment")
129111

130112
// cleaning the netem process after chaos injection
131-
if err = tc.Killnetem(targetPID); err != nil {
113+
if err = Killnetem(targetPID); err != nil {
132114
return err
133115
}
134116

@@ -307,68 +289,75 @@ func InjectChaos(experimentDetails *experimentTypes.ExperimentDetails, pid int)
307289
netemCommands := os.Getenv("NETEM_COMMAND")
308290
destinationIPs := os.Getenv("DESTINATION_IPS")
309291

310-
if destinationIPs == "" {
311-
tc := fmt.Sprintf("sudo nsenter -t %d -n tc qdisc add dev %s root netem %v", pid, experimentDetails.NetworkInterface, netemCommands)
312-
cmd := exec.Command("/bin/bash", "-c", tc)
313-
out, err := cmd.CombinedOutput()
314-
log.Info(cmd.String())
315-
if err != nil {
316-
log.Error(string(out))
317-
return err
318-
}
319-
} else {
320-
321-
ips := strings.Split(destinationIPs, ",")
322-
var uniqueIps []string
292+
select {
293+
case <-inject:
294+
// stopping the chaos execution, if abort signal recieved
295+
os.Exit(1)
296+
default:
323297

324-
// removing duplicates ips from the list, if any
325-
for i := range ips {
326-
isPresent := false
327-
for j := range uniqueIps {
328-
if ips[i] == uniqueIps[j] {
329-
isPresent = true
330-
}
331-
}
332-
if !isPresent {
333-
uniqueIps = append(uniqueIps, ips[i])
298+
if destinationIPs == "" {
299+
tc := fmt.Sprintf("sudo nsenter -t %d -n tc qdisc add dev %s root netem %v", pid, experimentDetails.NetworkInterface, netemCommands)
300+
cmd := exec.Command("/bin/bash", "-c", tc)
301+
out, err := cmd.CombinedOutput()
302+
log.Info(cmd.String())
303+
if err != nil {
304+
log.Error(string(out))
305+
return err
334306
}
307+
} else {
308+
309+
ips := strings.Split(destinationIPs, ",")
310+
var uniqueIps []string
311+
312+
// removing duplicates ips from the list, if any
313+
for i := range ips {
314+
isPresent := false
315+
for j := range uniqueIps {
316+
if ips[i] == uniqueIps[j] {
317+
isPresent = true
318+
}
319+
}
320+
if !isPresent {
321+
uniqueIps = append(uniqueIps, ips[i])
322+
}
335323

336-
}
324+
}
337325

338-
// Create a priority-based queue
339-
// This instantly creates classes 1:1, 1:2, 1:3
340-
priority := fmt.Sprintf("sudo nsenter -t %v -n tc qdisc add dev %v root handle 1: prio", pid, experimentDetails.NetworkInterface)
341-
cmd := exec.Command("/bin/bash", "-c", priority)
342-
out, err := cmd.CombinedOutput()
343-
log.Info(cmd.String())
344-
if err != nil {
345-
log.Error(string(out))
346-
return err
347-
}
326+
// Create a priority-based queue
327+
// This instantly creates classes 1:1, 1:2, 1:3
328+
priority := fmt.Sprintf("sudo nsenter -t %v -n tc qdisc add dev %v root handle 1: prio", pid, experimentDetails.NetworkInterface)
329+
cmd := exec.Command("/bin/bash", "-c", priority)
330+
out, err := cmd.CombinedOutput()
331+
log.Info(cmd.String())
332+
if err != nil {
333+
log.Error(string(out))
334+
return err
335+
}
348336

349-
// Add queueing discipline for 1:3 class.
350-
// No traffic is going through 1:3 yet
351-
traffic := fmt.Sprintf("sudo nsenter -t %v -n tc qdisc add dev %v parent 1:3 netem %v", pid, experimentDetails.NetworkInterface, netemCommands)
352-
cmd = exec.Command("/bin/bash", "-c", traffic)
353-
out, err = cmd.CombinedOutput()
354-
log.Info(cmd.String())
355-
if err != nil {
356-
log.Error(string(out))
357-
return err
358-
}
337+
// Add queueing discipline for 1:3 class.
338+
// No traffic is going through 1:3 yet
339+
traffic := fmt.Sprintf("sudo nsenter -t %v -n tc qdisc add dev %v parent 1:3 netem %v", pid, experimentDetails.NetworkInterface, netemCommands)
340+
cmd = exec.Command("/bin/bash", "-c", traffic)
341+
out, err = cmd.CombinedOutput()
342+
log.Info(cmd.String())
343+
if err != nil {
344+
log.Error(string(out))
345+
return err
346+
}
359347

360-
for _, ip := range uniqueIps {
361-
362-
// redirect traffic to specific IP through band 3
363-
// It allows ipv4 addresses only
364-
if !strings.Contains(ip, ":") {
365-
tc := fmt.Sprintf("sudo nsenter -t %v -n tc filter add dev %v protocol ip parent 1:0 prio 3 u32 match ip dst %v flowid 1:3", pid, experimentDetails.NetworkInterface, ip)
366-
cmd = exec.Command("/bin/bash", "-c", tc)
367-
out, err = cmd.CombinedOutput()
368-
log.Info(cmd.String())
369-
if err != nil {
370-
log.Error(string(out))
371-
return err
348+
for _, ip := range uniqueIps {
349+
350+
// redirect traffic to specific IP through band 3
351+
// It allows ipv4 addresses only
352+
if !strings.Contains(ip, ":") {
353+
tc := fmt.Sprintf("sudo nsenter -t %v -n tc filter add dev %v protocol ip parent 1:0 prio 3 u32 match ip dst %v flowid 1:3", pid, experimentDetails.NetworkInterface, ip)
354+
cmd = exec.Command("/bin/bash", "-c", tc)
355+
out, err = cmd.CombinedOutput()
356+
log.Info(cmd.String())
357+
if err != nil {
358+
log.Error(string(out))
359+
return err
360+
}
372361
}
373362
}
374363
}
@@ -386,9 +375,13 @@ func Killnetem(PID int) error {
386375

387376
if err != nil {
388377
log.Error(string(out))
378+
// ignoring err if qdisc process doesn't exist inside the target container
379+
if strings.Contains(string(out), qdiscNotFound) || strings.Contains(string(out), qdiscNoFileFound) {
380+
log.Warn("The network chaos process has already been removed")
381+
return nil
382+
}
389383
return err
390384
}
391-
392385
return nil
393386
}
394387

@@ -418,3 +411,26 @@ func Getenv(key string, defaultValue string) string {
418411
}
419412
return value
420413
}
414+
415+
// abortWatcher continuosly watch for the abort signals
416+
func abortWatcher(targetPID int) {
417+
418+
for {
419+
select {
420+
case <-abort:
421+
log.Info("[Chaos]: Killing process started because of terminated signal received")
422+
log.Info("Chaos Revert Started")
423+
// retry thrice for the chaos revert
424+
retry := 3
425+
for retry > 0 {
426+
if err = Killnetem(targetPID); err != nil {
427+
log.Errorf("unable to kill netem process, err :%v", err)
428+
}
429+
retry--
430+
time.Sleep(1 * time.Second)
431+
}
432+
log.Info("Chaos Revert Completed")
433+
os.Exit(1)
434+
}
435+
}
436+
}

chaoslib/litmus/network-chaos/lib/network-chaos.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@ package lib
22

33
import (
44
"net"
5+
"os"
6+
"os/signal"
57
"strconv"
68
"strings"
9+
"syscall"
710

811
clients "github.com/litmuschaos/litmus-go/pkg/clients"
12+
"github.com/litmuschaos/litmus-go/pkg/events"
913
experimentTypes "github.com/litmuschaos/litmus-go/pkg/generic/network-chaos/types"
1014
"github.com/litmuschaos/litmus-go/pkg/log"
15+
"github.com/litmuschaos/litmus-go/pkg/result"
1116
"github.com/litmuschaos/litmus-go/pkg/status"
1217
"github.com/litmuschaos/litmus-go/pkg/types"
1318
"github.com/litmuschaos/litmus-go/pkg/utils/common"
@@ -70,6 +75,8 @@ func PrepareAndInjectChaos(experimentsDetails *experimentTypes.ExperimentDetails
7075
}
7176
}
7277

78+
go abortWatcher(resultDetails, chaosDetails, clients, eventsDetails, experimentsDetails)
79+
7380
if experimentsDetails.Sequence == "serial" {
7481
if err = InjectChaosInSerialMode(experimentsDetails, targetPodList, clients, chaosDetails, args); err != nil {
7582
return err
@@ -371,3 +378,34 @@ func GetIpsForTargetHosts(targetHosts string) string {
371378
}
372379
return strings.Join(commaSeparatedIPs, ",")
373380
}
381+
382+
// abortWatcher continuosly watch for the abort signals
383+
// it will update the chaosresult
384+
func abortWatcher(resultDetails *types.ResultDetails, chaosDetails *types.ChaosDetails, clients clients.ClientSets, eventsDetails *types.EventDetails, experimentsDetails *experimentTypes.ExperimentDetails) {
385+
386+
// signChan channel is used to transmit signal notifications.
387+
signChan := make(chan os.Signal, 1)
388+
// Catch and relay certain signal(s) to signChan channel.
389+
signal.Notify(signChan, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
390+
391+
for {
392+
select {
393+
case <-signChan:
394+
log.Info("termination signal recieved, updating chaos status")
395+
// updating the chaosresult after stopped
396+
failStep := "Network Chaos injection stopped!"
397+
types.SetResultAfterCompletion(resultDetails, "Stopped", "Stopped", failStep)
398+
result.ChaosResult(chaosDetails, clients, resultDetails, "EOT")
399+
400+
// generating summary event in chaosengine
401+
msg := experimentsDetails.ExperimentName + " experiment has been aborted"
402+
types.SetEngineEventAttributes(eventsDetails, types.Summary, msg, "Warning", chaosDetails)
403+
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosEngine")
404+
405+
// generating summary event in chaosresult
406+
types.SetResultEventAttributes(eventsDetails, types.StoppedVerdict, msg, "Warning", resultDetails)
407+
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosResult")
408+
os.Exit(1)
409+
}
410+
}
411+
}

0 commit comments

Comments
 (0)