-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathmain.go
More file actions
539 lines (469 loc) · 18.2 KB
/
main.go
File metadata and controls
539 lines (469 loc) · 18.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
package main
import (
"context"
_ "embed"
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/c2h5oh/datasize"
"github.com/getkin/kin-openapi/openapi3filter"
"github.com/ghodss/yaml"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/kernel/hypeman"
"github.com/kernel/hypeman/cmd/api/api"
"github.com/kernel/hypeman/cmd/api/config"
"github.com/kernel/hypeman/lib/devices"
"github.com/kernel/hypeman/lib/guest"
"github.com/kernel/hypeman/lib/hypervisor/qemu"
"github.com/kernel/hypeman/lib/instances"
mw "github.com/kernel/hypeman/lib/middleware"
"github.com/kernel/hypeman/lib/oapi"
"github.com/kernel/hypeman/lib/otel"
"github.com/kernel/hypeman/lib/registry"
"github.com/kernel/hypeman/lib/scopes"
"github.com/kernel/hypeman/lib/vmm"
nethttpmiddleware "github.com/oapi-codegen/nethttp-middleware"
"github.com/riandyrn/otelchi"
"golang.org/x/sync/errgroup"
)
func main() {
if err := run(); err != nil {
slog.Error("application terminated", "error", err)
os.Exit(1)
}
slog.Info("main() exiting normally")
}
func metricsServerAddress(cfg *config.Config) string {
return net.JoinHostPort(cfg.Metrics.ListenAddress, strconv.Itoa(cfg.Metrics.Port))
}
func newMetricsServer(addr string, handler http.Handler) *http.Server {
mux := http.NewServeMux()
mux.Handle("/metrics", handler)
return &http.Server{
Addr: addr,
Handler: mux,
}
}
func run() error {
// Load config early for OTel initialization
// Config path can be specified via CONFIG_PATH env var or defaults to platform-specific locations
configPath := os.Getenv("CONFIG_PATH")
cfg, err := config.Load(configPath)
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
// Validate configuration before proceeding
if err := cfg.Validate(); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}
// Configure GPU profile cache TTL
devices.SetGPUProfileCacheTTL(cfg.GPU.ProfileCacheTTL)
// Initialize OpenTelemetry (before wire initialization)
otelCfg := otel.Config{
Enabled: cfg.Otel.Enabled,
Endpoint: cfg.Otel.Endpoint,
ServiceName: cfg.Otel.ServiceName,
ServiceInstanceID: cfg.Otel.ServiceInstanceID,
Insecure: cfg.Otel.Insecure,
MetricExportInterval: cfg.Otel.MetricExportInterval,
Version: cfg.Version,
Env: cfg.Env,
}
otelProvider, otelShutdown, err := otel.Init(context.Background(), otelCfg)
if err != nil {
return fmt.Errorf("initialize telemetry: %w", err)
}
defer func() {
slog.Info("shutting down OpenTelemetry")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := otelShutdown(shutdownCtx); err != nil {
slog.Warn("error shutting down OpenTelemetry", "error", err)
}
slog.Info("OpenTelemetry shutdown complete")
}()
// Initialize guest and vmm metrics.
if otelProvider.Meter != nil {
guestMetrics, err := guest.NewMetrics(otelProvider.Meter)
if err == nil {
guest.SetMetrics(guestMetrics)
}
vmmMetrics, err := vmm.NewMetrics(otelProvider.Meter)
if err == nil {
vmm.SetMetrics(vmmMetrics)
}
}
// Set global OTel log handler for logger package
if otelProvider.LogHandler != nil {
otel.SetGlobalLogHandler(otelProvider.LogHandler)
}
// Initialize app with wire
app, cleanup, err := initializeApp()
if err != nil {
return fmt.Errorf("initialize application: %w", err)
}
defer func() {
slog.Info("cleaning up application resources")
cleanup()
slog.Info("application cleanup complete")
}()
ctx, stop := signal.NotifyContext(app.Ctx, os.Interrupt, syscall.SIGTERM)
defer func() {
slog.Info("stopping signal handler")
stop()
slog.Info("signal handler stopped")
}()
logger := app.Logger
// Log OTel status
if cfg.Otel.Enabled {
logger.Info("OpenTelemetry push enabled", "endpoint", cfg.Otel.Endpoint, "service", cfg.Otel.ServiceName, "metric_export_interval", cfg.Otel.MetricExportInterval)
} else {
logger.Info("OpenTelemetry push disabled; Prometheus pull metrics remain available")
}
// Validate JWT secret is configured
if app.Config.JwtSecret == "" {
logger.Warn("JWT_SECRET not configured - API authentication will fail")
}
// Verify hypervisor access (KVM on Linux, Virtualization.framework on macOS)
if err := checkHypervisorAccess(); err != nil {
return fmt.Errorf("hypervisor access check failed: %w", err)
}
logger.Info("Hypervisor access verified", "type", hypervisorAccessCheckName())
// Check if QEMU is available (optional - only warn if not present)
if _, err := (&qemu.Starter{}).GetBinaryPath(nil, ""); err != nil {
logger.Warn("QEMU not available - QEMU hypervisor will not work", "error", err)
}
// Validate log rotation config
var logMaxSize datasize.ByteSize
if err := logMaxSize.UnmarshalText([]byte(app.Config.Logging.MaxSize)); err != nil {
return fmt.Errorf("invalid LOG_MAX_SIZE %q: %w", app.Config.Logging.MaxSize, err)
}
logRotateInterval, err := time.ParseDuration(app.Config.Logging.RotateInterval)
if err != nil {
return fmt.Errorf("invalid LOG_ROTATE_INTERVAL %q: %w", app.Config.Logging.RotateInterval, err)
}
// Ensure system files (kernel, initrd) exist before starting server
logger.Info("Ensuring system files...")
if err := app.SystemManager.EnsureSystemFiles(app.Ctx); err != nil {
logger.Error("failed to ensure system files", "error", err)
os.Exit(1)
}
kernelVer := app.SystemManager.GetDefaultKernelVersion()
logger.Info("System files ready",
"kernel", kernelVer)
// Initialize network manager (creates default network if needed)
// Get instance IDs that might have a running VMM for TAP cleanup safety.
// Include Unknown state: we couldn't confirm their state, but they might still
// have a running VMM. Better to leave a stale TAP than crash a running VM.
var preserveTAPs []string
allInstances, err := app.InstanceManager.ListInstances(app.Ctx, nil)
if err != nil {
// On error, skip TAP cleanup entirely to avoid crashing running VMs.
// Pass nil to Initialize to skip cleanup.
logger.Warn("failed to list instances for TAP cleanup, skipping cleanup", "error", err)
preserveTAPs = nil
} else {
// Initialize to empty slice (not nil) so cleanup runs even with no running VMs
preserveTAPs = []string{}
for _, inst := range allInstances {
if inst.State == instances.StateRunning || inst.State == instances.StateInitializing || inst.State == instances.StateUnknown {
preserveTAPs = append(preserveTAPs, inst.Id)
}
}
}
logger.Info("Initializing network manager...")
if err := app.NetworkManager.Initialize(app.Ctx, preserveTAPs); err != nil {
logger.Error("failed to initialize network manager", "error", err)
return fmt.Errorf("initialize network manager: %w", err)
}
// Set up HTB qdisc on bridge for network fair sharing
networkCapacity := app.ResourceManager.NetworkCapacity()
if err := app.NetworkManager.SetupHTB(app.Ctx, networkCapacity); err != nil {
logger.Warn("failed to setup HTB on bridge (network rate limiting disabled)", "error", err)
}
// Reconcile device state (clears orphaned attachments from crashed VMs)
// Set up liveness checker so device reconciliation can accurately detect orphaned attachments
logger.Info("Reconciling device state...")
livenessChecker := instances.NewLivenessChecker(app.InstanceManager)
if livenessChecker != nil {
app.DeviceManager.SetLivenessChecker(livenessChecker)
}
if err := app.DeviceManager.ReconcileDevices(app.Ctx); err != nil {
logger.Error("failed to reconcile device state", "error", err)
return fmt.Errorf("reconcile device state: %w", err)
}
// Reconcile mdev devices (clears orphaned vGPUs from previous runs)
logger.Info("Reconciling mdev devices...")
if err := devices.ReconcileMdevs(app.Ctx, nil); err != nil {
// Log but don't fail - mdev cleanup is best-effort
logger.Warn("failed to reconcile mdev devices", "error", err)
}
// Wire up resource validator for aggregate limit checking
// This enables the instance manager to validate CPU, memory, network, and GPU
// availability before creating or starting instances.
app.InstanceManager.SetResourceValidator(app.ResourceManager)
logger.Info("Resource validator configured")
// Initialize ingress manager (starts Caddy daemon and DNS server for dynamic upstreams)
logger.Info("Initializing ingress manager...")
if err := app.IngressManager.Initialize(app.Ctx); err != nil {
logger.Error("failed to initialize ingress manager", "error", err)
return fmt.Errorf("initialize ingress manager: %w", err)
}
logger.Info("Ingress manager initialized", "listen_addr", cfg.Caddy.ListenAddress, "admin", app.IngressManager.AdminURL())
// Create router
r := chi.NewRouter()
// Prepare HTTP metrics middleware (applied inside API group, not globally)
// Global application breaks WebSocket (Hijacker) and SSE (Flusher)
var httpMetricsMw func(http.Handler) http.Handler
if otelProvider.Meter != nil {
httpMetrics, err := mw.NewHTTPMetrics(otelProvider.Meter)
if err == nil {
httpMetricsMw = httpMetrics.Middleware
}
}
// Create access logger with OTel handler for HTTP request logging with trace correlation
var accessLogHandler slog.Handler
if otelProvider != nil {
accessLogHandler = otelProvider.LogHandler
}
accessLogger := mw.NewAccessLogger(accessLogHandler)
// Load OpenAPI spec for request validation
spec, err := oapi.GetSwagger()
if err != nil {
return fmt.Errorf("failed to load OpenAPI spec: %w", err)
}
// Clear servers to avoid host validation issues
// See: https://github.com/oapi-codegen/nethttp-middleware#usage
spec.Servers = nil
// Custom exec endpoint (outside OpenAPI spec, uses WebSocket)
// Note: No otelchi here as WebSocket doesn't work well with tracing middleware
r.With(
middleware.RequestID,
middleware.RealIP,
middleware.Recoverer,
mw.InjectLogger(logger),
mw.AccessLogger(accessLogger),
mw.JwtAuth(app.Config.JwtSecret),
scopes.RequireScope(scopes.InstanceWrite),
mw.ResolveResource(app.ApiService.NewResolvers(), api.ResolverErrorResponder),
).Get("/instances/{id}/exec", app.ApiService.ExecHandler)
// Custom cp endpoint (outside OpenAPI spec, uses WebSocket)
r.With(
middleware.RequestID,
middleware.RealIP,
middleware.Recoverer,
mw.InjectLogger(logger),
mw.AccessLogger(accessLogger),
mw.JwtAuth(app.Config.JwtSecret),
scopes.RequireScope(scopes.InstanceWrite),
mw.ResolveResource(app.ApiService.NewResolvers(), api.ResolverErrorResponder),
).Get("/instances/{id}/cp", app.ApiService.CpHandler)
// Create builder VM resolver for secure token authentication
// This validates that token requests from builder VMs are for their authorized repos only
// Create token handler for Docker Registry Token Authentication
// All clients must provide explicit credentials (Basic or Bearer auth with JWT)
tokenHandler := registry.NewTokenHandler(app.Config.JwtSecret)
// OCI Distribution registry endpoints for image push (outside OpenAPI spec)
r.Route("/v2", func(r chi.Router) {
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Recoverer)
if cfg.Otel.Enabled {
r.Use(otelchi.Middleware(cfg.Otel.ServiceName, otelchi.WithChiRoutes(r)))
}
r.Use(mw.InjectLogger(logger))
r.Use(mw.AccessLogger(accessLogger))
r.Use(mw.JwtAuth(app.Config.JwtSecret))
// Token endpoint for Docker Registry Token Authentication
// This is called by clients (like BuildKit) after receiving a 401 with WWW-Authenticate
r.Get("/token", tokenHandler.ServeHTTP)
r.Mount("/", app.Registry.Handler())
})
// Authenticated API endpoints
r.Group(func(r chi.Router) {
// Common middleware
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Recoverer)
// OpenTelemetry tracing middleware FIRST (creates span context)
if cfg.Otel.Enabled {
r.Use(otelchi.Middleware(cfg.Otel.ServiceName, otelchi.WithChiRoutes(r)))
}
// Inject logger into request context for handlers to use
// Use app logger (not accessLogger) so the instance log handler is included
r.Use(mw.InjectLogger(logger))
// Access logger AFTER otelchi so trace context is available
r.Use(mw.AccessLogger(accessLogger))
if httpMetricsMw != nil {
// Skip HTTP metrics for SSE streaming endpoints (logs)
r.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/logs") {
next.ServeHTTP(w, r)
return
}
httpMetricsMw(next).ServeHTTP(w, r)
})
})
}
r.Use(middleware.Timeout(60 * time.Second))
// OpenAPI request validation with authentication
validatorOptions := &nethttpmiddleware.Options{
Options: openapi3filter.Options{
AuthenticationFunc: mw.OapiAuthenticationFunc(app.Config.JwtSecret),
},
ErrorHandler: mw.OapiErrorHandler,
}
r.Use(nethttpmiddleware.OapiRequestValidatorWithOptions(spec, validatorOptions))
// Scoped permissions — enforce per-route scope requirements
r.Use(scopes.Middleware())
// Resource resolver middleware - resolves IDs/names/prefixes before handlers
// Enriches context with resolved resource and logger with resolved ID
r.Use(mw.ResolveResource(app.ApiService.NewResolvers(), api.ResolverErrorResponder))
// Setup strict handler
strictHandler := oapi.NewStrictHandler(app.ApiService, nil)
// Mount API routes (authentication now handled by validation middleware)
oapi.HandlerWithOptions(strictHandler, oapi.ChiServerOptions{
BaseRouter: r,
Middlewares: []oapi.MiddlewareFunc{api.NormalizeOptionalStandbyBody},
})
})
// Unauthenticated endpoints (outside group)
r.Get("/spec.yaml", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/vnd.oai.openapi")
w.Write(hypeman.OpenAPIYAML)
})
r.Get("/spec.json", func(w http.ResponseWriter, r *http.Request) {
jsonData, err := yaml.YAMLToJSON(hypeman.OpenAPIYAML)
if err != nil {
http.Error(w, "Failed to convert YAML to JSON", http.StatusInternalServerError)
logger.ErrorContext(r.Context(), "Failed to convert YAML to JSON", "error", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(jsonData)
})
r.Get("/swagger", api.SwaggerUIHandler)
// Create HTTP server
srv := &http.Server{
Addr: fmt.Sprintf(":%s", app.Config.Port),
Handler: r,
}
metricsAddr := metricsServerAddress(cfg)
if otelProvider.MetricsHandler == nil {
return fmt.Errorf("metrics handler is not initialized")
}
metricsSrv := newMetricsServer(metricsAddr, otelProvider.MetricsHandler)
// Error group for coordinated shutdown
grp, gctx := errgroup.WithContext(ctx)
// Start build manager background services (vsock handler for builder VMs)
if err := app.BuildManager.Start(gctx); err != nil {
logger.Error("failed to start build manager", "error", err)
return err
}
grp.Go(func() error {
if app.GuestMemoryController == nil {
return nil
}
logger.Info("starting guest memory controller")
return app.GuestMemoryController.Start(gctx)
})
// Run the server
grp.Go(func() error {
logger.Info("starting hypeman API", "port", app.Config.Port)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("http server error", "error", err)
return err
}
return nil
})
grp.Go(func() error {
logger.Info("starting metrics endpoint", "addr", metricsAddr, "path", "/metrics")
if err := metricsSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("metrics server error", "error", err)
return err
}
return nil
})
// Shutdown handler
grp.Go(func() error {
<-gctx.Done()
logger.Info("shutdown signal received")
// Use WithoutCancel to preserve context values while preventing cancellation
shutdownCtx := context.WithoutCancel(gctx)
shutdownCtx, cancel := context.WithTimeout(shutdownCtx, 30*time.Second)
defer cancel()
var shutdownErrs []error
if err := srv.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("failed to shutdown http server", "error", err)
shutdownErrs = append(shutdownErrs, fmt.Errorf("shutdown http server: %w", err))
} else {
logger.Info("http server shutdown complete")
}
if err := metricsSrv.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("failed to shutdown metrics server", "error", err)
shutdownErrs = append(shutdownErrs, fmt.Errorf("shutdown metrics server: %w", err))
} else {
logger.Info("metrics server shutdown complete")
}
// Shutdown ingress manager (stops Caddy if CADDY_STOP_ON_SHUTDOWN=true)
if err := app.IngressManager.Shutdown(shutdownCtx); err != nil {
logger.Error("failed to shutdown ingress manager", "error", err)
// Don't return error - continue with shutdown
} else {
logger.Info("ingress manager shutdown complete")
}
return errors.Join(shutdownErrs...)
})
// Log rotation scheduler
grp.Go(func() error {
ticker := time.NewTicker(logRotateInterval)
defer ticker.Stop()
logger.Info("log rotation scheduler started", "interval", app.Config.Logging.RotateInterval, "max_size", logMaxSize, "max_files", app.Config.Logging.MaxFiles)
for {
select {
case <-gctx.Done():
return nil
case <-ticker.C:
if err := app.InstanceManager.RotateLogs(gctx, int64(logMaxSize), app.Config.Logging.MaxFiles); err != nil {
logger.Error("log rotation failed", "error", err)
} else {
logger.Debug("log rotation completed", "max_size", logMaxSize, "max_files", app.Config.Logging.MaxFiles)
}
}
}
})
// Snapshot schedule scheduler
if scheduleManager, ok := app.InstanceManager.(instances.SnapshotScheduleManager); ok {
const snapshotSchedulePollInterval = time.Minute
grp.Go(func() error {
ticker := time.NewTicker(snapshotSchedulePollInterval)
defer ticker.Stop()
logger.Info("snapshot schedule scheduler started", "interval", snapshotSchedulePollInterval)
for {
select {
case <-gctx.Done():
return nil
case <-ticker.C:
if err := scheduleManager.RunSnapshotSchedules(gctx); err != nil {
logger.Error("snapshot schedule run completed with errors", "error", err)
}
}
}
})
} else {
logger.Warn("snapshot schedule manager unavailable; scheduled snapshots disabled")
}
err = grp.Wait()
slog.Info("all goroutines finished")
return err
}