-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathFileChunkSink.ts
More file actions
92 lines (80 loc) · 2.62 KB
/
FileChunkSink.ts
File metadata and controls
92 lines (80 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import type { BacktraceStorage } from '@backtrace/sdk-core';
import type { ChunkSink } from './Chunkifier';
import type { BacktraceStreamStorage } from './storage';
import type { FileWritableStream } from './StreamWriter';
interface FileChunkSinkOptions {
/**
* Maximum number of files.
*/
readonly maxFiles: number;
/**
* Full path to the chunk file.
*/
readonly file: (n: number) => string;
/**
* File system to use.
*/
readonly storage: BacktraceStorage & BacktraceStreamStorage;
}
/**
* Chunk sink which writes data to disk.
*
* Each time a new chunk is created, a new stream is created with path provided from options.
*/
export class FileChunkSink {
private readonly _streamTracker: LimitedFifo<FileWritableStream>;
/**
* Returns all files that have been written to and are not deleted.
*/
public get files() {
return this._streamTracker.elements;
}
constructor(private readonly _options: FileChunkSinkOptions) {
// Track files using a FIFO queue
this._streamTracker = limitedFifo<FileWritableStream>(_options.maxFiles, async (stream) => {
await stream
.close()
.catch(() => {
// Fail silently here, there's not much we can do about this
})
.finally(() =>
_options.storage.remove(stream.path).catch(() => {
// Fail silently here, there's not much we can do about this
}),
);
});
}
/**
* Returns `ChunkSink`. Pass this to `chunkifier`.
*/
public getSink(): ChunkSink<string, FileWritableStream> {
return (n) => {
const stream = this.createStream(n);
this._streamTracker.push(stream);
return stream;
};
}
private createStream(n: number) {
const path = this._options.file(n);
return this._options.storage.createWriteStream(path);
}
}
/**
* Limited FIFO queue. Each time the capacity is exceeded, the first element is removed
* and `onShift` is called with the removed element.
* @param capacity Maximum capacity.
*/
function limitedFifo<T>(capacity: number, onShift: (t: T) => void) {
const elements: T[] = [];
function push(element: T) {
elements.push(element);
if (elements.length > capacity) {
const first = elements.shift();
if (first) {
onShift(first);
}
}
}
return { elements: elements as readonly T[], push };
}
type LimitedFifo<T> = ReturnType<typeof limitedFifo<T>>;