Skip to content

Commit 541e7df

Browse files
committed
switch stream-upload api to use native SDK file-upload, calculate checksum
1 parent 1a8adfa commit 541e7df

12 files changed

Lines changed: 177 additions & 156 deletions

File tree

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/ignored
1+
*/ignored
22
/.env
33
/.docker.env
44
/src/web/badge/Cargo.lock

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/lib/docs_rs_storage/Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ async-stream = { workspace = true }
2323
aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] }
2424
aws-sdk-s3 = { version = "1.3.0", default-features = false, features = ["default-https-client", "rt-tokio"] }
2525
aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] }
26+
base64 = { workspace = true }
2627
bzip2 = "0.6.0"
2728
chrono = { workspace = true }
29+
crc32fast = "1.4.2"
2830
dashmap = { version = "6.0.0", optional = true }
2931
docs_rs_config = { path = "../docs_rs_config" }
3032
docs_rs_env_vars = { path = "../docs_rs_env_vars" }
@@ -37,8 +39,6 @@ docs_rs_utils = { path = "../docs_rs_utils" }
3739
flate2 = "1.1.1"
3840
futures-util = { workspace = true }
3941
http = { workspace = true }
40-
http-body = "1.0.0"
41-
http-body-util = "0.1.3"
4242
itertools = { workspace = true }
4343
mime = { workspace = true }
4444
moka = { version = "0.12.14", features = ["future"] }
@@ -73,5 +73,9 @@ name = "archive_index_cache"
7373
harness = false
7474
required-features = ["testing"]
7575

76+
[[bench]]
77+
name = "crc32"
78+
harness = false
79+
7680
[lints]
7781
workspace = true
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
2+
use docs_rs_storage::crc32_for_path;
3+
use std::{fs, hint::black_box};
4+
5+
pub fn crc32_file(c: &mut Criterion) {
6+
let fixture_path = tempfile::NamedTempFile::new().unwrap().into_temp_path();
7+
8+
let fixture = vec![b'x'; 16 * 1024 * 1024];
9+
fs::write(&fixture_path, &fixture).unwrap();
10+
11+
let mut group = c.benchmark_group("crc32");
12+
group.throughput(Throughput::Bytes(fixture.len() as u64));
13+
group.bench_function("file_16mib", |b| {
14+
b.iter(|| crc32_for_path(black_box(&fixture_path)));
15+
});
16+
group.finish();
17+
}
18+
19+
criterion_group!(crc32_benches, crc32_file);
20+
criterion_main!(crc32_benches);

crates/lib/docs_rs_storage/src/backends/memory.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
Blob,
33
backends::StorageBackendMethods,
4-
blob::{StreamUpload, StreamingBlob},
4+
blob::{StreamUpload, StreamUploadSource, StreamingBlob},
55
errors::PathNotFoundError,
66
metrics::StorageMetrics,
77
types::FileRange,
@@ -12,7 +12,7 @@ use dashmap::DashMap;
1212
use docs_rs_headers::compute_etag;
1313
use futures_util::stream::{self, BoxStream};
1414
use itertools::Itertools as _;
15-
use tokio::io;
15+
use tokio::fs;
1616

1717
pub(crate) struct MemoryBackend {
1818
otel_metrics: StorageMetrics,
@@ -56,16 +56,17 @@ impl StorageBackendMethods for MemoryBackend {
5656
compression,
5757
} = upload;
5858

59-
let mut content = source.reader().await?;
60-
let mut buffer = Vec::new();
61-
io::copy(&mut content, &mut buffer).await?;
59+
let content = match source {
60+
StreamUploadSource::Bytes(content) => content.to_vec(),
61+
StreamUploadSource::File(path) => fs::read(&path).await?,
62+
};
6263

6364
let blob = Blob {
6465
path,
6566
mime,
6667
date_updated: Utc::now(),
67-
etag: Some(compute_etag(&buffer)),
68-
content: buffer,
68+
etag: Some(compute_etag(&content)),
69+
content,
6970
compression,
7071
};
7172

crates/lib/docs_rs_storage/src/backends/s3.rs

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::{
22
Config,
33
backends::StorageBackendMethods,
4-
blob::{StreamUpload, StreamingBlob},
4+
blob::{StreamUpload, StreamUploadSource, StreamingBlob},
5+
crc32_for_path,
56
errors::PathNotFoundError,
67
metrics::StorageMetrics,
78
types::FileRange,
@@ -13,20 +14,17 @@ use aws_sdk_s3::{
1314
Client,
1415
config::{Region, retry::RetryConfig},
1516
error::{ProvideErrorMetadata, SdkError},
16-
primitives::ByteStream,
17-
types::{Delete, ObjectIdentifier},
17+
primitives::{ByteStream, Length},
18+
types::{ChecksumAlgorithm, Delete, ObjectIdentifier},
1819
};
1920
use aws_smithy_types_convert::date_time::DateTimeExt;
21+
use base64::{Engine as _, engine::general_purpose::STANDARD as b64};
2022
use chrono::Utc;
2123
use docs_rs_headers::{ETag, compute_etag};
22-
use futures_util::{
23-
TryStreamExt,
24-
stream::{BoxStream, StreamExt},
25-
};
26-
use http_body::Frame;
27-
use http_body_util::StreamBody;
24+
use docs_rs_utils::spawn_blocking;
25+
use futures_util::stream::{BoxStream, StreamExt};
2826
use opentelemetry::KeyValue;
29-
use tokio_util::io::ReaderStream;
27+
use tokio::fs;
3028
use tracing::{error, warn};
3129

3230
// error codes to check for when trying to determine if an error is
@@ -46,6 +44,8 @@ static NOT_FOUND_ERROR_CODES: [&str; 5] = [
4644
"XMinioInvalidObjectName",
4745
];
4846

47+
const S3_UPLOAD_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB
48+
4949
trait S3ResultExt<T> {
5050
fn convert_errors(self) -> anyhow::Result<T>;
5151
}
@@ -243,34 +243,74 @@ impl StorageBackendMethods for S3Backend {
243243
compression,
244244
} = upload;
245245

246-
let content_length = source.content_length().await?;
246+
let (content_length, checksum_crc32) = match &source {
247+
StreamUploadSource::Bytes(bytes) => (bytes.len() as u64, None),
248+
StreamUploadSource::File(local_path) => {
249+
let local_path = local_path.clone();
250+
251+
(
252+
fs::metadata(&local_path).await?.len(),
253+
Some(
254+
spawn_blocking(move || Ok(b64.encode(crc32_for_path(local_path)?))).await?,
255+
),
256+
)
257+
}
258+
};
247259

248260
let mut last_err = None;
249261

250262
for attempt in 1..=3 {
251-
let reader = source.reader().await?;
252-
let stream = ReaderStream::new(reader).map_ok(Frame::data);
263+
let body = match &source {
264+
StreamUploadSource::Bytes(bytes) => ByteStream::from(bytes.clone()),
265+
StreamUploadSource::File(path) => {
266+
// NOTE:
267+
// reading the upload-data from a local path is
268+
// "retryable" in the AWS SDK sense.
269+
// ".file" (file pointer) is not retryable.
270+
ByteStream::read_from()
271+
.path(path)
272+
.buffer_size(S3_UPLOAD_BUFFER_SIZE)
273+
.length(Length::Exact(content_length))
274+
.build()
275+
.await?
276+
}
277+
};
253278

254-
match self
279+
let mut request = self
255280
.client
256281
.put_object()
257282
.bucket(&self.bucket)
258283
.key(&path)
259-
.body(ByteStream::from_body_1_x(StreamBody::new(stream)))
284+
.body(body)
260285
.content_length(content_length as i64)
261286
.content_type(mime.to_string())
262-
.set_content_encoding(compression.map(|alg| alg.to_string()))
263-
.send()
264-
.await
265-
{
287+
.set_content_encoding(compression.map(|alg| alg.to_string()));
288+
289+
// NOTE: when you try to stream-upload a local file, the AWS SDK by default
290+
// uses a "middleware" to calculate the checksum for the content, to compare it after
291+
// uploading.
292+
// This piece is broken right now, but only when using S3 directly. On minio, all is
293+
// fiine.
294+
// I don't want to disable checksums so we're sure the files are uploaded correctly.
295+
// So the only alternative (outside of trying to fix the SDK) is to calculate the
296+
// checksum ourselves. This is a little annoying because this means we have to read the
297+
// whole file before upload. But since I don't want to load all files into memory before
298+
// upload, this is the only option.
299+
if let Some(checksum_crc32) = &checksum_crc32 {
300+
request = request
301+
.checksum_algorithm(ChecksumAlgorithm::Crc32)
302+
.checksum_crc32(checksum_crc32);
303+
}
304+
305+
match request.send().await {
266306
Ok(_) => {
267307
self.otel_metrics
268308
.uploaded_files
269309
.add(1, &[KeyValue::new("attempt", attempt.to_string())]);
270310
return Ok(());
271311
}
272312
Err(err) => {
273-
warn!(?err, attempt = attempt + 1, %path, "failed to upload blob to S3");
313+
warn!(?err, attempt, %path, "failed to upload blob to S3");
274314
last_err = Some(err);
275315
}
276316
}

crates/lib/docs_rs_storage/src/blob.rs

Lines changed: 6 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,13 @@ use chrono::{DateTime, Utc};
44
use docs_rs_headers::{ETag, compute_etag};
55
use docs_rs_types::CompressionAlgorithm;
66
use mime::Mime;
7-
use std::{
8-
fmt,
9-
io::{Cursor, SeekFrom},
10-
sync::Arc,
11-
};
12-
use tokio::{
13-
fs,
14-
io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeekExt},
15-
};
7+
use std::{fmt, io::Cursor, path::PathBuf};
8+
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt};
9+
use tokio_util::bytes::Bytes;
1610

1711
pub enum StreamUploadSource {
18-
Bytes(Arc<[u8]>),
19-
File(fs::File),
20-
}
21-
22-
impl StreamUploadSource {
23-
pub async fn reader(&self) -> io::Result<Box<dyn AsyncRead + Unpin + Send + Sync>> {
24-
Ok(match self {
25-
Self::Bytes(bytes) => Box::new(Cursor::new(bytes.clone())),
26-
Self::File(file) => {
27-
let mut cloned = file.try_clone().await?;
28-
cloned.seek(SeekFrom::Start(0)).await?;
29-
Box::new(cloned)
30-
}
31-
})
32-
}
33-
34-
pub async fn content_length(&self) -> io::Result<u64> {
35-
Ok(match self {
36-
Self::Bytes(bytes) => bytes.len() as u64,
37-
Self::File(file) => file.metadata().await?.len(),
38-
})
39-
}
12+
Bytes(Bytes),
13+
File(PathBuf),
4014
}
4115

4216
/// Represents a stream blob to be uploaded to storage.
@@ -59,7 +33,7 @@ impl From<BlobUpload> for StreamUpload {
5933
Self {
6034
path: value.path,
6135
mime: value.mime,
62-
source: StreamUploadSource::Bytes(Arc::from(value.content)),
36+
source: StreamUploadSource::Bytes(value.content.into()),
6337
compression: value.compression,
6438
}
6539
}
@@ -200,10 +174,6 @@ mod test {
200174
use super::*;
201175
use crate::compress_async;
202176
use docs_rs_headers::compute_etag;
203-
use tokio::{
204-
fs,
205-
io::{AsyncReadExt as _, AsyncWriteExt as _},
206-
};
207177

208178
const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00];
209179

@@ -246,52 +216,6 @@ mod test {
246216
Ok(())
247217
}
248218

249-
#[tokio::test]
250-
async fn test_stream_upload_source_bytes_creates_fresh_readers() -> Result<()> {
251-
const CONTENT: &[u8] = b"Hello, world!";
252-
253-
let source = StreamUploadSource::Bytes(Arc::from(CONTENT));
254-
assert_eq!(source.content_length().await?, CONTENT.len() as u64);
255-
256-
let mut first = source.reader().await?;
257-
let mut first_buf = Vec::new();
258-
first.read_to_end(&mut first_buf).await?;
259-
assert_eq!(first_buf, CONTENT);
260-
261-
let mut second = source.reader().await?;
262-
let mut second_buf = Vec::new();
263-
second.read_to_end(&mut second_buf).await?;
264-
assert_eq!(second_buf, CONTENT);
265-
266-
Ok(())
267-
}
268-
269-
#[tokio::test]
270-
async fn test_stream_upload_source_file_creates_fresh_readers() -> Result<()> {
271-
const CONTENT: &[u8] = b"Hello, world!";
272-
273-
let tempfile = tempfile::NamedTempFile::new()?;
274-
let mut file = fs::File::from_std(tempfile.reopen()?);
275-
file.write_all(CONTENT).await?;
276-
file.seek(std::io::SeekFrom::Start(CONTENT.len() as u64))
277-
.await?;
278-
279-
let source = StreamUploadSource::File(file);
280-
assert_eq!(source.content_length().await?, CONTENT.len() as u64);
281-
282-
let mut first = source.reader().await?;
283-
let mut first_buf = Vec::new();
284-
first.read_to_end(&mut first_buf).await?;
285-
assert_eq!(first_buf, CONTENT);
286-
287-
let mut second = source.reader().await?;
288-
let mut second_buf = Vec::new();
289-
second.read_to_end(&mut second_buf).await?;
290-
assert_eq!(second_buf, CONTENT);
291-
292-
Ok(())
293-
}
294-
295219
#[tokio::test]
296220
async fn test_streaming_broken_zstd_blob() -> Result<()> {
297221
const NOT_ZSTD: &[u8] = b"Hello, world!";

crates/lib/docs_rs_storage/src/config.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ impl AppConfig for ArchiveIndexCacheConfig {
7575

7676
#[derive(Debug)]
7777
pub struct Config {
78-
pub temp_dir: PathBuf,
79-
8078
// Storage params
8179
pub storage_backend: StorageKind,
8280

@@ -117,7 +115,6 @@ impl AppConfig for Config {
117115
let cores = std::thread::available_parallelism()?.get();
118116

119117
Ok(Self {
120-
temp_dir: prefix.join("tmp"),
121118
storage_backend: env("DOCSRS_STORAGE_BACKEND", StorageKind::default())?,
122119
aws_sdk_max_retries: env("DOCSRS_AWS_SDK_MAX_RETRIES", 6u32)?,
123120
s3_bucket: env("DOCSRS_S3_BUCKET", "rust-docs-rs".to_string())?,

crates/lib/docs_rs_storage/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub use storage::blocking::Storage;
2222
pub use storage::non_blocking::AsyncStorage;
2323
pub use types::StorageKind;
2424
pub use utils::{
25+
crc32::crc32_for_path,
2526
file_list::get_file_list,
2627
storage_path::{rustdoc_archive_path, rustdoc_json_path, source_archive_path},
2728
};

0 commit comments

Comments
 (0)