Skip to content

Commit 29f2bbf

Browse files
committed
feat: add Pipeline glue type sequencing truncation, file write, and ring publish
1 parent fcf52be commit 29f2bbf

1 file changed

Lines changed: 67 additions & 0 deletions

File tree

server/lib/events/pipeline.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package events
2+
3+
import (
4+
"sync/atomic"
5+
"time"
6+
)
7+
8+
// Pipeline glues a RingBuffer and a FileWriter into a single write path.
9+
// A single call to Publish stamps the event with a monotonic sequence number,
10+
// applies truncation, durably appends it to the per-category log file, and
11+
// then makes it available to ring buffer readers.
12+
type Pipeline struct {
13+
ring *RingBuffer
14+
files *FileWriter
15+
seq atomic.Uint64
16+
captureSessionID atomic.Value // stores string
17+
}
18+
19+
// NewPipeline returns a Pipeline backed by the supplied ring and file writer.
20+
func NewPipeline(ring *RingBuffer, files *FileWriter) *Pipeline {
21+
p := &Pipeline{ring: ring, files: files}
22+
p.captureSessionID.Store("")
23+
return p
24+
}
25+
26+
// Start sets the capture session ID that will be stamped on every subsequent
27+
// published event. It may be called at any time; the change is immediately
28+
// visible to concurrent Publish calls.
29+
func (p *Pipeline) Start(captureSessionID string) {
30+
p.captureSessionID.Store(captureSessionID)
31+
}
32+
33+
// Publish stamps, truncates, files, and broadcasts a single event.
34+
//
35+
// Ordering:
36+
// 1. Stamp CaptureSessionID, Seq, Ts (Ts only if caller left it zero)
37+
// 2. Apply truncateIfNeeded (SCHEMA-04) — must happen before both sinks
38+
// 3. Write to FileWriter (durable before in-memory)
39+
// 4. Publish to RingBuffer (in-memory fan-out)
40+
//
41+
// Errors from FileWriter.Write are silently dropped; the ring buffer always
42+
// receives the event even if the file write fails.
43+
func (p *Pipeline) Publish(ev BrowserEvent) {
44+
ev.CaptureSessionID = p.captureSessionID.Load().(string)
45+
ev.Seq = p.seq.Add(1) // starts at 1
46+
if ev.Ts == 0 {
47+
ev.Ts = time.Now().UnixMilli()
48+
}
49+
ev = truncateIfNeeded(ev)
50+
51+
// File write first — durable before in-memory.
52+
_ = p.files.Write(ev)
53+
54+
// Ring buffer last — readers see the event after the file is written.
55+
p.ring.Publish(ev)
56+
}
57+
58+
// NewReader returns a Reader positioned at the start of the ring buffer.
59+
func (p *Pipeline) NewReader() *Reader {
60+
return p.ring.NewReader()
61+
}
62+
63+
// Close closes the underlying FileWriter, flushing and releasing all open
64+
// file descriptors.
65+
func (p *Pipeline) Close() error {
66+
return p.files.Close()
67+
}

0 commit comments

Comments
 (0)