Skip to content

Commit 4f14a28

Browse files
authored
Implement TestMetadata_Subscriptions E2E test (knative#3324)
* Implement TestMetadata_Subscriptions E2E test This test verifies the full Knative Eventing flow using func subscribe: - Creates a producer function that sends CloudEvents to the broker - Creates a subscriber function that receives events via a Trigger - Validates the complete event delivery pipeline Fixes knative#3202 * Fix subscribe command verbose flag compatibility The subscribe command doesn't support the -v (verbose) flag that newCmd() automatically adds when FUNC_E2E_VERBOSE=true. This causes test failures. Changed to use exec.Command directly for the subscribe call to bypass the automatic verbose flag addition while keeping it for other commands. * Add Knative Eventing availability check to subscription test Check for broker-ingress service before running the test to gracefully skip if Knative Eventing is not installed or not ready yet. * Add broker-ingress service availability check The test was failing in CI because it tried to send events before the broker-ingress service was registered in DNS. This adds a retry loop to wait for the service to be available before proceeding. Fixes timing issues in fresh cluster environments. * fix: improve subscription test with webhook callback verification - Remove unused sendEventToBroker* functions - Use callback pattern for reliable event verification - Clean up comments and reduce code size * refactor: simplify event subscription e2e test by removing custom producer and callback server, and using stern to verify event reception.
1 parent 1630700 commit 4f14a28

1 file changed

Lines changed: 221 additions & 8 deletions

File tree

e2e/e2e_metadata_test.go

Lines changed: 221 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,23 @@
44
package e2e
55

66
import (
7+
"bufio"
78
"bytes"
9+
"context"
810
"encoding/json"
911
"fmt"
12+
"io"
13+
"net/http"
1014
"os"
15+
"os/exec"
1116
"path/filepath"
17+
"regexp"
18+
"strings"
1219
"testing"
20+
"time"
1321

1422
fn "knative.dev/func/pkg/functions"
23+
fnhttp "knative.dev/func/pkg/http"
1524
)
1625

1726
// ---------------------------------------------------------------------------
@@ -552,13 +561,217 @@ func Handle(w http.ResponseWriter, _ *http.Request) {
552561
}
553562
}
554563

555-
// TODO: TestMetadata_Subscriptions ensures that function instances can be
556-
// subscribed to events.
564+
// Tests the complete event flow using func subscribe
557565
func TestMetadata_Subscriptions(t *testing.T) {
558-
// TODO
559-
// Create a function which emits an event with as much defaults as possible
560-
// Create a function which subscribes to those events
561-
// Succeed the test as soon as it receives the event
562-
// https://github.com/knative/func/issues/3202
563-
t.Skip("Subscription E2E tests not yet implemented")
566+
brokerName := "default"
567+
568+
createBrokerWithCheck(t, Namespace, brokerName)
569+
570+
uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano())
571+
572+
eventReceived := waitForEvent(t, uniqueEventID)
573+
574+
subscriber := "func-e2e-test-subscriber"
575+
subscriberName := subscriber
576+
subscriberRoot := fromCleanEnv(t, subscriberName)
577+
if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil {
578+
t.Fatal(err)
579+
}
580+
if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"),
581+
[]byte(subscriberCode()), 0644); err != nil {
582+
t.Fatal(err)
583+
}
584+
585+
subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event")
586+
subscribeCmd.Stdout, subscribeCmd.Stderr = os.Stdout, os.Stderr
587+
if err := subscribeCmd.Run(); err != nil {
588+
t.Fatal(err)
589+
}
590+
591+
f, err := fn.NewFunction(subscriberRoot)
592+
if err != nil {
593+
t.Fatal(err)
594+
}
595+
if len(f.Deploy.Subscriptions) != 1 {
596+
t.Fatalf("expected 1 subscription, got %d", len(f.Deploy.Subscriptions))
597+
}
598+
599+
if err := newCmd(t, "deploy").Run(); err != nil {
600+
t.Fatal(err)
601+
}
602+
defer clean(t, subscriberName, Namespace)
603+
604+
subscriberURL := fmt.Sprintf("http://%s.%s.%s", subscriberName, Namespace, Domain)
605+
if !waitFor(t, subscriberURL, withTemplate("cloudevents")) {
606+
t.Fatal("subscriber not ready")
607+
}
608+
waitForTrigger(t, Namespace, subscriberName)
609+
610+
transport := fnhttp.NewRoundTripper()
611+
defer transport.Close()
612+
client := http.Client{
613+
Transport: transport,
614+
Timeout: 30 * time.Second,
615+
}
616+
url := fmt.Sprintf("http://broker-ingress.knative-eventing.svc/%s/%s", Namespace, brokerName)
617+
req, _ := http.NewRequestWithContext(t.Context(), "POST", url, strings.NewReader(`{}`))
618+
req.Header.Set("Content-Type", "application/json")
619+
req.Header.Set("ce-specversion", "1.0")
620+
req.Header.Set("ce-type", "test.event")
621+
req.Header.Set("ce-source", "producer")
622+
req.Header.Set("ce-id", uniqueEventID)
623+
624+
resp, err := client.Do(req)
625+
if err != nil {
626+
t.Fatalf("Failed to invoke producer: %v", err)
627+
}
628+
body, _ := io.ReadAll(resp.Body)
629+
resp.Body.Close()
630+
if resp.StatusCode != 202 {
631+
t.Fatalf("Broker rejected event: code: %d, body: %q", resp.StatusCode, body)
632+
}
633+
t.Logf("Broker accepted event %s", uniqueEventID)
634+
635+
select {
636+
case receivedID := <-eventReceived:
637+
t.Logf("Event flow verified (received: %s)", receivedID)
638+
case <-time.After(60 * time.Second):
639+
t.Fatal("Timeout: No callback from subscriber")
640+
}
641+
}
642+
643+
func waitForEvent(t *testing.T, eventId string) <-chan string {
644+
t.Helper()
645+
646+
eventReceived := make(chan string, 10)
647+
648+
ctx, cancel := context.WithCancel(context.Background())
649+
t.Cleanup(cancel)
650+
651+
pr, pw := io.Pipe()
652+
cmd := exec.CommandContext(ctx, "stern", "func-e2e-test-subscriber-.*")
653+
cmd.Stderr = io.Discard
654+
cmd.Stdout = pw
655+
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
656+
err := cmd.Start()
657+
if err != nil {
658+
t.Fatal(err)
659+
}
660+
go func() {
661+
r := bufio.NewReader(pr)
662+
m, e := regexp.MatchReader(`EVENT_RECEIVED: id=`+eventId, r)
663+
if e != nil {
664+
panic(e)
665+
}
666+
if m {
667+
eventReceived <- "OK"
668+
close(eventReceived)
669+
cancel()
670+
}
671+
_, _ = io.Copy(io.Discard, r)
672+
}()
673+
674+
return eventReceived
675+
}
676+
677+
// CloudEvents handler that logs events
678+
func subscriberCode() string {
679+
return `package function
680+
681+
import (
682+
"context"
683+
"fmt"
684+
"github.com/cloudevents/sdk-go/v2/event"
685+
)
686+
687+
func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
688+
fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source())
689+
r := event.New()
690+
r.SetID("response-" + e.ID())
691+
r.SetSource("subscriber")
692+
r.SetType("test.response")
693+
r.SetData("application/json", map[string]string{"status": "received"})
694+
return &r, nil
695+
}
696+
`
697+
}
698+
699+
// createBrokerWithCheck creates a Knative Broker
700+
func createBrokerWithCheck(t *testing.T, namespace, name string) {
701+
t.Helper()
702+
703+
brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1
704+
kind: Broker
705+
metadata:
706+
name: %s
707+
namespace: %s
708+
`, name, namespace)
709+
710+
cmd := exec.Command("kubectl", "apply", "-f", "-")
711+
cmd.Stdin = strings.NewReader(brokerYAML)
712+
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
713+
714+
output, err := cmd.CombinedOutput()
715+
if err != nil {
716+
t.Fatalf("Failed to create broker: %v, output: %s", err, string(output))
717+
}
718+
t.Cleanup(func() {
719+
deleteBroker(t, namespace, name)
720+
})
721+
t.Logf("Created broker %s in namespace %s", name, namespace)
722+
723+
waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
724+
fmt.Sprintf("broker/%s", name), "-n", namespace, "--timeout=60s")
725+
waitCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
726+
waitOutput, err := waitCmd.CombinedOutput()
727+
if err != nil {
728+
t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput))
729+
}
730+
t.Logf("Broker %s is ready", name)
731+
732+
// Wait for broker-ingress service to be available
733+
t.Log("Waiting for broker-ingress service to be available...")
734+
for i := 0; i < 30; i++ {
735+
checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress")
736+
checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
737+
if err := checkCmd.Run(); err == nil {
738+
t.Log("broker-ingress service is available")
739+
return
740+
}
741+
time.Sleep(2 * time.Second)
742+
}
743+
t.Fatal("broker-ingress service check timed out")
744+
}
745+
746+
// deleteBroker removes a Knative Broker from the given namespace.
747+
func deleteBroker(t *testing.T, namespace, name string) {
748+
t.Helper()
749+
750+
cmd := exec.Command("kubectl", "delete", "broker", name, "-n", namespace, "--ignore-not-found")
751+
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
752+
753+
output, err := cmd.CombinedOutput()
754+
if err != nil {
755+
t.Logf("Warning: could not delete broker: %v, output: %s", err, string(output))
756+
return
757+
}
758+
t.Logf("Deleted broker %s from namespace %s", name, namespace)
759+
}
760+
761+
// waitForTrigger waits for the function's trigger to become ready.
762+
func waitForTrigger(t *testing.T, namespace, functionName string) {
763+
t.Helper()
764+
765+
triggerName := fmt.Sprintf("%s-function-trigger-0", functionName)
766+
767+
cmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
768+
fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s")
769+
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
770+
771+
output, err := cmd.CombinedOutput()
772+
if err != nil {
773+
t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(output))
774+
} else {
775+
t.Logf("Trigger %s is ready", triggerName)
776+
}
564777
}

0 commit comments

Comments
 (0)