Skip to content

Commit d9c8647

Browse files
authored
Merge branch 'minhd-vu/sensor-broadcast' into minhd-vu/rpc-methods
2 parents bce9585 + 5d4ce45 commit d9c8647

9 files changed

Lines changed: 293 additions & 267 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
timeout-minutes: 5
3939
steps:
4040
- uses: actions/checkout@v6
41-
- uses: crate-ci/typos@78bc6fb2c0d734235d57a2d6b9de923cc325ebdd # v1.43.4
41+
- uses: crate-ci/typos@57b11c6b7e54c402ccd9cda953f1072ec4f78e33 # v1.43.5
4242

4343
gen-doc:
4444
name: Check gen-doc generated files

cmd/p2p/sensor/api.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package sensor
22

33
import (
4-
"context"
54
"encoding/json"
65
"fmt"
76
"net/http"
@@ -58,8 +57,7 @@ type apiData struct {
5857
// handleAPI sets up the API for interacting with the sensor. All endpoints
5958
// return information about the sensor node and all connected peers, including
6059
// the types and counts of eth packets sent and received by each peer.
61-
// The server gracefully shuts down when the context is cancelled.
62-
func handleAPI(ctx context.Context, server *ethp2p.Server, conns *p2p.Conns) {
60+
func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {
6361
mux := http.NewServeMux()
6462
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
6563
if r.Method != http.MethodGet {
@@ -119,16 +117,7 @@ func handleAPI(ctx context.Context, server *ethp2p.Server, conns *p2p.Conns) {
119117
})
120118

121119
addr := fmt.Sprintf(":%d", inputSensorParams.APIPort)
122-
httpServer := &http.Server{Addr: addr, Handler: mux}
123-
124-
go func() {
125-
<-ctx.Done()
126-
if err := httpServer.Shutdown(context.Background()); err != nil {
127-
log.Error().Err(err).Msg("Failed to shutdown API server")
128-
}
129-
}()
130-
131-
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
120+
if err := http.ListenAndServe(addr, mux); err != nil {
132121
log.Error().Err(err).Msg("Failed to start API handler")
133122
}
134123
}

cmd/p2p/sensor/rpc.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package sensor
22

33
import (
4-
"context"
54
"encoding/json"
65
"fmt"
76
"io"
@@ -44,8 +43,7 @@ type rpcError struct {
4443
// It handles eth_sendRawTransaction requests, validates transaction signatures,
4544
// and broadcasts valid transactions to all connected peers.
4645
// Supports both single requests and batch requests per JSON-RPC 2.0 specification.
47-
// The server gracefully shuts down when the context is cancelled.
48-
func handleRPC(ctx context.Context, conns *p2p.Conns, networkID uint64) {
46+
func handleRPC(conns *p2p.Conns, networkID uint64) {
4947
// Use network ID as chain ID for signature validation
5048
chainID := new(big.Int).SetUint64(networkID)
5149

@@ -97,17 +95,8 @@ func handleRPC(ctx context.Context, conns *p2p.Conns, networkID uint64) {
9795
})
9896

9997
addr := fmt.Sprintf(":%d", inputSensorParams.RPCPort)
100-
server := &http.Server{Addr: addr, Handler: mux}
101-
102-
go func() {
103-
<-ctx.Done()
104-
if err := server.Shutdown(context.Background()); err != nil {
105-
log.Error().Err(err).Msg("Failed to shutdown RPC server")
106-
}
107-
}()
108-
10998
log.Info().Str("addr", addr).Msg("Starting JSON-RPC server")
110-
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
99+
if err := http.ListenAndServe(addr, mux); err != nil {
111100
log.Error().Err(err).Msg("Failed to start RPC server")
112101
}
113102
}

cmd/p2p/sensor/sensor.go

Lines changed: 63 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
_ "embed"
77
"errors"
88
"fmt"
9-
"os"
109
"os/signal"
1110
"syscall"
1211
"time"
@@ -173,7 +172,10 @@ var SensorCmd = &cobra.Command{
173172
return nil
174173
},
175174
RunE: func(cmd *cobra.Command, args []string) error {
176-
db, err := newDatabase(cmd.Context())
175+
ctx, stop := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM)
176+
defer stop()
177+
178+
db, err := newDatabase(ctx)
177179
if err != nil {
178180
return err
179181
}
@@ -214,7 +216,7 @@ var SensorCmd = &cobra.Command{
214216
})
215217

216218
opts := p2p.EthProtocolOptions{
217-
Context: cmd.Context(),
219+
Context: ctx,
218220
Database: db,
219221
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
220222
RPC: inputSensorParams.RPC,
@@ -259,64 +261,36 @@ var SensorCmd = &cobra.Command{
259261
if err = server.Start(); err != nil {
260262
return err
261263
}
262-
defer server.Stop()
264+
defer stopServer(&server)
263265

264266
events := make(chan *ethp2p.PeerEvent)
265267
sub := server.SubscribeEvents(events)
266268
defer sub.Unsubscribe()
267269

268-
ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds.
269-
ticker1h := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour.
270+
ticker := time.NewTicker(2 * time.Second)
270271
defer ticker.Stop()
271-
defer ticker1h.Stop()
272-
273-
dnsLock := make(chan struct{}, 1)
274-
signals := make(chan os.Signal, 1)
275-
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
276-
277-
// Create a cancellable context for graceful shutdown of background goroutines.
278-
ctx, cancel := context.WithCancel(cmd.Context())
279-
defer cancel()
280272

281273
if inputSensorParams.ShouldRunPprof {
282-
go handlePprof(ctx)
274+
go handlePprof()
283275
}
284276

285277
if inputSensorParams.ShouldRunPrometheus {
286-
go handlePrometheus(ctx)
278+
go handlePrometheus()
287279
}
288280

289-
go handleAPI(ctx, &server, conns)
290-
291-
// Start the RPC server for receiving transactions
292-
go handleRPC(ctx, conns, inputSensorParams.NetworkID)
293-
294-
// Run DNS discovery immediately at startup.
295-
go handleDNSDiscovery(ctx, &server, dnsLock)
281+
go handleAPI(&server, conns)
282+
go handleRPC(conns, inputSensorParams.NetworkID)
283+
go handleDNSDiscovery(&server)
296284

297285
for {
298286
select {
299287
case <-ticker.C:
300288
peersGauge.Set(float64(server.PeerCount()))
301-
db.WritePeers(cmd.Context(), server.Peers(), time.Now())
302-
289+
db.WritePeers(ctx, server.Peers(), time.Now())
303290
metrics.Update(conns.HeadBlock().Block, conns.OldestBlock())
304-
305-
urls := []string{}
306-
for _, peer := range server.Peers() {
307-
urls = append(urls, peer.Node().URLv4())
308-
}
309-
310-
if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil {
311-
log.Error().Err(err).Msg("Failed to write nodes to file")
312-
}
313-
case <-ticker1h.C:
314-
go handleDNSDiscovery(ctx, &server, dnsLock)
315-
case <-signals:
316-
// This gracefully stops the sensor so that the peers can be written to
317-
// the nodes file.
291+
writePeers(server.Peers())
292+
case <-ctx.Done():
318293
log.Info().Msg("Stopping sensor...")
319-
cancel()
320294
return nil
321295
case event := <-events:
322296
log.Debug().Any("event", event).Send()
@@ -327,70 +301,78 @@ var SensorCmd = &cobra.Command{
327301
},
328302
}
329303

330-
// handlePprof starts a server for performance profiling using pprof on the
331-
// specified port. This allows for real-time monitoring and analysis of the
332-
// sensor's performance. The port number is configured through
333-
// inputSensorParams.PprofPort. The server gracefully shuts down when the
334-
// context is cancelled.
335-
func handlePprof(ctx context.Context) {
336-
addr := fmt.Sprintf(":%d", inputSensorParams.PprofPort)
337-
server := &http.Server{Addr: addr}
304+
// writePeers writes the enode URLs of connected peers to the nodes file.
305+
func writePeers(peers []*ethp2p.Peer) {
306+
urls := make([]string, 0, len(peers))
307+
for _, peer := range peers {
308+
urls = append(urls, peer.Node().URLv4())
309+
}
310+
311+
if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil {
312+
log.Error().Err(err).Msg("Failed to write nodes to file")
313+
}
314+
}
315+
316+
// stopServer stops the p2p server with a timeout to avoid hanging on shutdown.
317+
// This is necessary because go-ethereum's discovery shutdown can deadlock.
318+
func stopServer(server *ethp2p.Server) {
319+
done := make(chan struct{})
338320

339321
go func() {
340-
<-ctx.Done()
341-
if err := server.Shutdown(context.Background()); err != nil {
342-
log.Error().Err(err).Msg("Failed to shutdown pprof server")
343-
}
322+
server.Stop()
323+
close(done)
344324
}()
345325

346-
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
326+
select {
327+
case <-done:
328+
case <-time.After(5 * time.Second):
329+
}
330+
}
331+
332+
// handlePprof starts a server for performance profiling using pprof on the
333+
// specified port. This allows for real-time monitoring and analysis of the
334+
// sensor's performance. The port number is configured through
335+
// inputSensorParams.PprofPort. An error is logged if the server fails to start.
336+
func handlePprof() {
337+
addr := fmt.Sprintf(":%d", inputSensorParams.PprofPort)
338+
if err := http.ListenAndServe(addr, nil); err != nil {
347339
log.Error().Err(err).Msg("Failed to start pprof")
348340
}
349341
}
350342

351343
// handlePrometheus starts a server to expose Prometheus metrics at the /metrics
352344
// endpoint. This enables Prometheus to scrape and collect metrics data for
353345
// monitoring purposes. The port number is configured through
354-
// inputSensorParams.PrometheusPort. The server gracefully shuts down when the
355-
// context is cancelled.
356-
func handlePrometheus(ctx context.Context) {
357-
mux := http.NewServeMux()
358-
mux.Handle("/metrics", promhttp.Handler())
359-
346+
// inputSensorParams.PrometheusPort. An error is logged if the server fails to
347+
// start.
348+
func handlePrometheus() {
349+
http.Handle("/metrics", promhttp.Handler())
360350
addr := fmt.Sprintf(":%d", inputSensorParams.PrometheusPort)
361-
server := &http.Server{Addr: addr, Handler: mux}
362-
363-
go func() {
364-
<-ctx.Done()
365-
if err := server.Shutdown(context.Background()); err != nil {
366-
log.Error().Err(err).Msg("Failed to shutdown Prometheus server")
367-
}
368-
}()
369-
370-
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
351+
if err := http.ListenAndServe(addr, nil); err != nil {
371352
log.Error().Err(err).Msg("Failed to start Prometheus handler")
372353
}
373354
}
374355

375356
// handleDNSDiscovery performs DNS-based peer discovery and adds new peers to
376357
// the p2p server. It uses an iterator to discover peers incrementally rather
377-
// than loading all nodes at once. The lock channel prevents concurrent runs.
378-
// Discovery stops when the context is cancelled.
379-
func handleDNSDiscovery(ctx context.Context, server *ethp2p.Server, lock chan struct{}) {
358+
// than loading all nodes at once. Runs immediately and then hourly.
359+
func handleDNSDiscovery(server *ethp2p.Server) {
380360
if len(inputSensorParams.DiscoveryDNS) == 0 {
381361
return
382362
}
383363

384-
select {
385-
case lock <- struct{}{}:
386-
defer func() { <-lock }()
387-
case <-ctx.Done():
388-
return
389-
default:
390-
log.Warn().Msg("DNS discovery already running, skipping")
391-
return
364+
discoverPeers(server)
365+
366+
ticker := time.NewTicker(time.Hour)
367+
defer ticker.Stop()
368+
369+
for range ticker.C {
370+
discoverPeers(server)
392371
}
372+
}
393373

374+
// discoverPeers performs a single DNS discovery round.
375+
func discoverPeers(server *ethp2p.Server) {
394376
log.Info().
395377
Str("discovery-dns", inputSensorParams.DiscoveryDNS).
396378
Msg("Starting DNS discovery")
@@ -403,27 +385,13 @@ func handleDNSDiscovery(ctx context.Context, server *ethp2p.Server, lock chan st
403385
}
404386
defer iter.Close()
405387

406-
// Add DNS-discovered peers using the iterator.
407388
count := 0
408389
for iter.Next() {
409-
// Check for context cancellation to stop discovery promptly.
410-
select {
411-
case <-ctx.Done():
412-
log.Info().
413-
Int("discovered_peers", count).
414-
Msg("DNS discovery interrupted")
415-
return
416-
default:
417-
}
418-
419390
node := iter.Node()
420391
log.Debug().
421392
Str("enode", node.URLv4()).
422393
Msg("Discovered peer through DNS")
423394

424-
// Add the peer to the static node set. The server itself handles whether to
425-
// connect to the peer if it's already connected. If a node is part of the
426-
// static peer set, the server will handle reconnecting after disconnects.
427395
server.AddPeer(node)
428396
count++
429397
}

go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
3434
golang.org/x/text v0.34.0
3535
golang.org/x/time v0.14.0
36-
google.golang.org/api v0.266.0
36+
google.golang.org/api v0.267.0
3737
google.golang.org/protobuf v1.36.11
3838
)
3939

@@ -163,15 +163,15 @@ require (
163163
golang.org/x/sys v0.41.0 // indirect
164164
golang.org/x/term v0.40.0 // indirect
165165
google.golang.org/genproto v0.0.0-20260128011058-8636f8732409 // indirect
166-
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
166+
google.golang.org/genproto/googleapis/api v0.0.0-20260203192932-546029d2fa20 // indirect
167167
google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 // indirect
168168
google.golang.org/grpc v1.78.0 // indirect
169169
gopkg.in/yaml.v3 v3.0.1 // indirect
170170
lukechampine.com/blake3 v1.3.0 // indirect
171171
)
172172

173173
require (
174-
cloud.google.com/go/kms v1.25.0
174+
cloud.google.com/go/kms v1.26.0
175175
github.com/0xPolygon/cdk-contracts-tooling v0.0.1
176176
github.com/chromedp/cdproto v0.0.0-20250724212937-08a3db8b4327
177177
github.com/chromedp/chromedp v0.14.2
@@ -180,7 +180,7 @@ require (
180180
github.com/fatih/color v1.18.0
181181
github.com/gateway-fm/vectorized-poseidon-gold v1.0.0
182182
github.com/gdamore/tcell/v2 v2.13.8
183-
github.com/go-echarts/go-echarts/v2 v2.6.7
183+
github.com/go-echarts/go-echarts/v2 v2.7.0
184184
github.com/go-errors/errors v1.5.1
185185
github.com/google/tink/go v1.7.0
186186
github.com/iden3/go-iden3-crypto v0.0.17

0 commit comments

Comments
 (0)