diff --git a/apps/desktop/src-tauri/src/http_client.rs b/apps/desktop/src-tauri/src/http_client.rs index 47774eead0..bebaf88966 100644 --- a/apps/desktop/src-tauri/src/http_client.rs +++ b/apps/desktop/src-tauri/src/http_client.rs @@ -8,7 +8,13 @@ pub struct HttpClient(reqwest::Client); impl Default for HttpClient { fn default() -> Self { - Self(reqwest::Client::new()) + Self( + reqwest::Client::builder() + .connect_timeout(std::time::Duration::from_secs(10)) + .timeout(std::time::Duration::from_secs(30)) + .build() + .expect("Failed to build HTTP client"), + ) } } @@ -26,21 +32,17 @@ impl Default for RetryableHttpClient { fn default() -> Self { Self( reqwest::Client::builder() + .connect_timeout(std::time::Duration::from_secs(10)) .retry( reqwest::retry::always() - .classify_fn(|req_rep| { - match req_rep.status() { - // Server errors - Some(s) - if s.is_server_error() - || s == StatusCode::TOO_MANY_REQUESTS => - { - req_rep.retryable() - } - // Network errors - None => req_rep.retryable(), - _ => req_rep.success(), + .classify_fn(|req_rep| match req_rep.status() { + Some(s) + if s.is_server_error() || s == StatusCode::TOO_MANY_REQUESTS => + { + req_rep.retryable() } + None => req_rep.retryable(), + _ => req_rep.success(), }) .max_retries_per_request(5) .max_extra_load(5.0), diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index 482a47d328..eccb7e1956 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -1645,6 +1645,55 @@ pub(crate) async fn create_screenshot( result } +pub(crate) async fn create_screenshot_source_from_segments( + segments_dir: &std::path::Path, +) -> Result { + let init_path = segments_dir.join("init.mp4"); + if !init_path.exists() { + return Err(format!("init.mp4 not found in {}", segments_dir.display())); + } + + let first_segment = find_first_segment(segments_dir) + .ok_or_else(|| format!("No .m4s segments found in {}", segments_dir.display()))?; + + let temp_path = segments_dir.join(".screenshot_source.mp4"); + let mut out = tokio::fs::File::create(&temp_path) + .await + .map_err(|e| format!("Failed to create screenshot source: {e}"))?; + + let mut init_file = tokio::fs::File::open(&init_path) + .await + .map_err(|e| format!("Failed to open init.mp4: {e}"))?; + tokio::io::copy(&mut init_file, &mut out) + .await + .map_err(|e| format!("Failed to copy init.mp4: {e}"))?; + + let mut seg_file = tokio::fs::File::open(&first_segment) + .await + .map_err(|e| format!("Failed to open {}: {e}", first_segment.display()))?; + tokio::io::copy(&mut seg_file, &mut out) + .await + .map_err(|e| format!("Failed to copy segment: {e}"))?; + + Ok(temp_path) +} + +fn find_first_segment(dir: &std::path::Path) -> Option { + let mut segments: Vec = std::fs::read_dir(dir) + .ok()? + .filter_map(|e| { + let path = e.ok()?.path(); + if path.extension().is_some_and(|ext| ext == "m4s") { + Some(path) + } else { + None + } + }) + .collect(); + segments.sort(); + segments.into_iter().next() +} + // async fn create_thumbnail(input: PathBuf, output: PathBuf, size: (u32, u32)) -> Result<(), String> { // println!("Creating thumbnail: input={input:?}, output={output:?}, size={size:?}"); diff --git a/apps/desktop/src-tauri/src/recording.rs b/apps/desktop/src-tauri/src/recording.rs index b37983078b..c933ab89c2 100644 --- a/apps/desktop/src-tauri/src/recording.rs +++ b/apps/desktop/src-tauri/src/recording.rs @@ -5,8 +5,8 @@ use cap_project::cursor::SHORT_CURSOR_SHAPE_DEBOUNCE_MS; use cap_project::{ CameraShape, CursorClickEvent, GlideDirection, InstantRecordingMeta, MultipleSegments, Platform, ProjectConfiguration, RecordingMeta, RecordingMetaInner, SharingMeta, - StudioRecordingMeta, StudioRecordingStatus, TimelineConfiguration, TimelineSegment, UploadMeta, - ZoomMode, ZoomSegment, cursor::CursorEvents, + StudioRecordingMeta, StudioRecordingStatus, TimelineConfiguration, TimelineSegment, ZoomMode, + ZoomSegment, cursor::CursorEvents, }; #[cfg(target_os = "macos")] use cap_recording::SendableShareableContent; @@ -60,14 +60,12 @@ use crate::{ api::PresignedS3PutRequestMethod, audio::AppSounds, auth::AuthStore, - create_screenshot, + create_screenshot, create_screenshot_source_from_segments, general_settings::{GeneralSettingsStore, PostDeletionBehaviour, PostStudioRecordingBehaviour}, open_external_link, presets::PresetsStore, thumbnails::*, - upload::{ - InstantMultipartUpload, SegmentUploader, build_video_meta, compress_image, upload_video, - }, + upload::{InstantMultipartUpload, SegmentUploader, compress_image}, web_api::ManagerExt, windows::{CapWindowId, ShowCapWindow}, }; @@ -79,6 +77,11 @@ pub struct InProgressRecordingCommon { pub recording_dir: PathBuf, } +pub struct StopFailureContext { + pub segment_upload: SegmentUploader, + pub video_upload_info: VideoUploadInfo, +} + pub enum InProgressRecording { Instant { handle: instant_recording::ActorHandle, @@ -192,25 +195,39 @@ impl InProgressRecording { } } - pub async fn stop(self) -> anyhow::Result { - Ok(match self { + pub async fn stop( + self, + ) -> Result)> { + match self { Self::Instant { handle, segment_upload, video_upload_info, common, .. - } => CompletedRecording::Instant { - recording: handle.stop().await?, - segment_upload, - video_upload_info, - target_name: common.target_name, + } => match handle.stop().await { + Ok(recording) => Ok(CompletedRecording::Instant { + recording, + segment_upload, + video_upload_info, + target_name: common.target_name, + }), + Err(e) => Err(( + e, + Some(StopFailureContext { + segment_upload, + video_upload_info, + }), + )), }, - Self::Studio { handle, common, .. } => CompletedRecording::Studio { - recording: handle.stop().await?, - target_name: common.target_name, + Self::Studio { handle, common, .. } => match handle.stop().await { + Ok(recording) => Ok(CompletedRecording::Studio { + recording, + target_name: common.target_name, + }), + Err(e) => Err((e, None)), }, - }) + } } pub fn done_fut(&self) -> cap_recording::DoneFut { @@ -1300,10 +1317,21 @@ pub async fn stop_recording(app: AppHandle, state: MutableState<'_, App>) -> Res return Err("Recording not in progress".to_string())?; }; - let completed_recording = current_recording.stop().await.map_err(|e| e.to_string())?; - let recording_dir = completed_recording.project_path().clone(); + let recording_dir = current_recording.recording_dir().clone(); - handle_recording_end(app, Ok(completed_recording), &mut state, recording_dir).await?; + let recording_outcome = match current_recording.stop().await { + Ok(completed) => Ok(completed), + Err((e, ctx)) => { + error!("Recording stop failed: {e:#}"); + if let Some(ctx) = ctx { + ctx.segment_upload.handle.abort(); + crate::upload::emit_upload_complete(&app, &ctx.video_upload_info.id); + } + Err(e.to_string()) + } + }; + + handle_recording_end(app, recording_outcome, &mut state, recording_dir).await?; Ok(()) } @@ -1817,14 +1845,27 @@ async fn handle_recording_finish( } let app = app.clone(); - let output_path = recording_dir.join("content/output.mp4"); + let segments_dir = recording_dir.join("content/display"); let display_screenshot = screenshots_dir.join("display.jpg"); - let screenshot_task = tokio::spawn(create_screenshot( - output_path.clone(), - display_screenshot.clone(), - None, - )); + let screenshot_task = tokio::spawn({ + let segments_dir = segments_dir.clone(); + let display_screenshot = display_screenshot.clone(); + async move { + let screenshot_source: Result = + create_screenshot_source_from_segments(&segments_dir).await; + match screenshot_source { + Ok(temp_path) => { + let result = + create_screenshot(temp_path.clone(), display_screenshot, None) + .await; + let _ = tokio::fs::remove_file(&temp_path).await; + result + } + Err(e) => Err(format!("Failed to create screenshot source: {e}")), + } + } + }); let _ = open_external_link(app.clone(), video_upload_info.link.clone()); @@ -1833,109 +1874,60 @@ async fn handle_recording_finish( let recording_dir = recording_dir.clone(); async move { - let video_upload_succeeded = match segment_upload + let upload_succeeded = segment_upload .handle .await .map_err(|e| e.to_string()) .and_then(|r| r.map_err(|v| v.to_string())) - { - Ok(()) => { - info!("Segment upload succeeded"); - true - } - Err(e) => { - error!("Segment upload failed: {}", e); - false - } - }; + .is_ok(); + + if upload_succeeded { + info!("Segment upload succeeded"); + } else { + crate::upload::emit_upload_complete(&app, &video_upload_info.id); + } let _ = screenshot_task.await; - if video_upload_succeeded { - if let Ok(bytes) = - compress_image(display_screenshot).await - .map_err(|err| - error!("Error compressing thumbnail for instant mode progressive upload: {err}") - ) { - let res = crate::upload::singlepart_uploader( - app.clone(), - crate::api::PresignedS3PutRequest { - video_id: video_upload_info.id.clone(), - subpath: "screenshot/screen-capture.jpg".to_string(), - method: PresignedS3PutRequestMethod::Put, - meta: None, - }, - bytes.len() as u64, - stream::once(async move { Ok::<_, std::io::Error>(bytes::Bytes::from(bytes)) }), + if upload_succeeded + && let Ok(bytes) = + compress_image(display_screenshot).await.map_err(|err| { + error!( + "Error compressing thumbnail for instant mode progressive upload: {err}" ) - .await; - if let Err(err) = res { - error!("Error updating thumbnail for instant mode progressive upload: {err}"); - return; - } - - if GeneralSettingsStore::get(&app).ok().flatten().unwrap_or_default().delete_instant_recordings_after_upload && let Err(err) = tokio::fs::remove_dir_all(&recording_dir).await { - error!("Failed to remove recording files after upload: {err:?}"); - } - - } - } else { - let meta = match build_video_meta(&output_path) { - Ok(m) => Some(m), - Err(err) => { - error!("Error getting video metadata: {err}"); - warn!( - "Attempting to repair corrupt recording before fallback upload" - ); - match crate::upload::try_repair_corrupt_mp4(&output_path) { - Ok(()) => { - info!("Repair succeeded, retrying metadata extraction"); - build_video_meta(&output_path) - .map_err(|e| { - error!("Still unreadable after repair: {e}") - }) - .ok() - } - Err(e) => { - error!("Repair failed: {e}"); - None - } - } - } - }; - - if let Some(meta) = meta { - upload_video( - &app, - video_upload_info.id.clone(), - output_path, - display_screenshot.clone(), - meta, - None, - ) - .await - .map(|_| { - info!("Final video upload with screenshot completed successfully") }) - .map_err(|error| { - error!("Error in upload_video: {error}"); - - if let Ok(mut meta) = - RecordingMeta::load_for_project(&recording_dir) - { - meta.upload = Some(UploadMeta::Failed { - error: error.to_string(), - }); - meta.save_for_project() - .map_err(|e| format!("Failed to save recording meta: {e}")) - .ok(); - } - }) - .ok(); - } else { - crate::upload::emit_upload_complete(&app, &video_upload_info.id); + { + let res = crate::upload::singlepart_uploader( + app.clone(), + crate::api::PresignedS3PutRequest { + video_id: video_upload_info.id.clone(), + subpath: "screenshot/screen-capture.jpg".to_string(), + method: PresignedS3PutRequestMethod::Put, + meta: None, + }, + bytes.len() as u64, + stream::once(async move { + Ok::<_, std::io::Error>(bytes::Bytes::from(bytes)) + }), + ) + .await; + if let Err(err) = res { + error!( + "Error updating thumbnail for instant mode progressive upload: {err}" + ); } } + + if upload_succeeded + && GeneralSettingsStore::get(&app) + .ok() + .flatten() + .unwrap_or_default() + .delete_instant_recordings_after_upload + && let Err(err) = tokio::fs::remove_dir_all(&recording_dir).await + { + error!("Failed to remove recording files after upload: {err:?}"); + } } }); diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 6a09c81429..eaf30f877e 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -1135,6 +1135,22 @@ impl SegmentUploader { } } + { + let s = state.lock().unwrap_or_else(|e| e.into_inner()); + if !s.failed_segments.is_empty() { + let missing: Vec<_> = s + .failed_segments + .iter() + .map(|f| f.subpath.as_str()) + .collect(); + error!( + count = s.failed_segments.len(), + segments = ?missing, + "Completing upload with missing segments - video may have gaps" + ); + } + } + let final_manifest = state .lock() .unwrap_or_else(|e| e.into_inner()) @@ -1173,6 +1189,8 @@ impl SegmentUploader { meta.save_for_project().ok(); } + emit_upload_complete(&app, &video_id); + return Err(format!( "Failed to signal recording complete for {video_id} after 3 attempts" ) diff --git a/crates/enc-avfoundation/src/mp4.rs b/crates/enc-avfoundation/src/mp4.rs index 55b10936e4..fb1555343b 100644 --- a/crates/enc-avfoundation/src/mp4.rs +++ b/crates/enc-avfoundation/src/mp4.rs @@ -1911,10 +1911,10 @@ mod tests { let timestamp = Duration::from_millis(ts_ms); let frame = create_test_video_frame(&pool, (ts_ms as i64) * 1000, 33_333); - if let Ok(()) = encoder.queue_video_frame(frame, timestamp) { - if ts_ms > threshold_ms { - accepted_past_threshold += 1; - } + if let Ok(()) = encoder.queue_video_frame(frame, timestamp) + && ts_ms > threshold_ms + { + accepted_past_threshold += 1; } } diff --git a/crates/recording/src/instant_recording.rs b/crates/recording/src/instant_recording.rs index 3ed2fe2062..e1b53d046f 100644 --- a/crates/recording/src/instant_recording.rs +++ b/crates/recording/src/instant_recording.rs @@ -28,7 +28,6 @@ struct Pipeline { audio: Option, video_info: VideoInfo, segments_dir: PathBuf, - audio_path: Option, segment_rx: Option>, } @@ -160,75 +159,45 @@ impl Message for Actor { } } - let (wall_clock_duration, segments_dir, audio_path) = + let segments_dir = replace_with::replace_with_or_abort_and_return(&mut self.state, |state| { let result = match &state { - ActorState::Recording { - pipeline, - segment_start_time, - .. - } - | ActorState::Paused { - pipeline, - segment_start_time, - .. - } => { - let dur = std::time::Duration::from_secs_f64( - current_time_f64() - segment_start_time, - ); - ( - dur, - pipeline.segments_dir.clone(), - pipeline.audio_path.clone(), - ) - } - ActorState::Stopped => ( - std::time::Duration::ZERO, - self.recording_dir.join("content").join("display"), - None, - ), + ActorState::Recording { pipeline, .. } + | ActorState::Paused { pipeline, .. } => pipeline.segments_dir.clone(), + ActorState::Stopped => self.recording_dir.join("content").join("display"), }; (result, state) }); - let wall_clock_duration = wall_clock_duration.saturating_sub(self.total_pause_duration); self.stop().await?; - let output_path = self.recording_dir.join("content").join("output.mp4"); - - let health = tokio::task::spawn_blocking(move || { - let assembly_result = - assemble_canonical_mp4(&segments_dir, audio_path.as_deref(), &output_path); - - match assembly_result { - Ok(()) => { - let validation = crate::output_validation::validate_instant_recording( - &output_path, - wall_clock_duration, - ); - match &validation.health { - crate::RecordingHealth::Healthy => { - debug!("Instant recording output validated as healthy"); - } - crate::RecordingHealth::Degraded { issues } => { - warn!(?issues, "Instant recording output has quality issues"); - } - _ => {} - } - validation.health - } + let has_init = segments_dir.join("init.mp4").exists(); + let has_segments = has_init + && match std::fs::read_dir(&segments_dir) { + Ok(entries) => entries + .filter_map(Result::ok) + .any(|e| e.path().extension().is_some_and(|ext| ext == "m4s")), Err(e) => { - error!("Failed to assemble canonical MP4: {e:#}"); - crate::RecordingHealth::Damaged { - reason: format!("Segment assembly failed: {e}"), - } + warn!( + path = %segments_dir.display(), + error = %e, + "Failed to read segments directory, treating as no segments" + ); + false } + }; + + let health = if has_segments { + crate::RecordingHealth::Healthy + } else if has_init { + crate::RecordingHealth::Degraded { + issues: vec!["Recording too short — no complete segments produced".to_string()], } - }) - .await - .unwrap_or_else(|e| crate::RecordingHealth::Damaged { - reason: format!("Assembly task panicked: {e}"), - }); + } else { + crate::RecordingHealth::Damaged { + reason: "No video segments produced".to_string(), + } + }; Ok(CompletedRecording { project_path: self.recording_dir.clone(), @@ -333,164 +302,6 @@ pub struct CompletedRecording { pub health: crate::RecordingHealth, } -fn assemble_canonical_mp4( - segments_dir: &std::path::Path, - audio_path: Option<&std::path::Path>, - output_path: &std::path::Path, -) -> anyhow::Result<()> { - use cap_enc_ffmpeg::remux::{ - concatenate_m4s_segments_with_init, merge_video_audio, probe_media_valid, - }; - - let init_path = segments_dir.join("init.mp4"); - if !init_path.exists() { - return Err(anyhow::anyhow!( - "Video init segment not found at {}", - init_path.display() - )); - } - - let manifest_path = segments_dir.join("manifest.json"); - let segment_paths = if manifest_path.exists() { - read_segment_paths_from_manifest(&manifest_path, segments_dir)? - } else { - discover_segment_files(segments_dir)? - }; - - if segment_paths.is_empty() { - return Err(anyhow::anyhow!( - "No video segments found in {}", - segments_dir.display() - )); - } - - info!( - segment_count = segment_paths.len(), - segments_dir = %segments_dir.display(), - has_audio = audio_path.is_some(), - "Assembling canonical MP4 from segments" - ); - - let video_only_path = if audio_path.is_some() { - output_path.with_extension("video_only.mp4") - } else { - output_path.to_path_buf() - }; - - concatenate_m4s_segments_with_init(&init_path, &segment_paths, &video_only_path) - .map_err(|e| anyhow::anyhow!("Video segment concatenation failed: {e}"))?; - - let resolved_audio = audio_path.and_then(|audio| { - if audio.is_file() && probe_media_valid(audio) { - return Some(audio.to_path_buf()); - } - - if audio.is_dir() { - let audio_init = audio.join("init.mp4"); - if !audio_init.exists() { - warn!( - "Audio init segment not found at {}, skipping audio", - audio_init.display() - ); - return None; - } - - let audio_manifest = audio.join("manifest.json"); - let audio_segments = if audio_manifest.exists() { - read_segment_paths_from_manifest(&audio_manifest, audio).ok() - } else { - discover_segment_files(audio).ok() - }; - - if let Some(segments) = audio_segments - && !segments.is_empty() - { - let assembled_audio = output_path.with_extension("audio_assembled.m4a"); - match concatenate_m4s_segments_with_init(&audio_init, &segments, &assembled_audio) { - Ok(()) => { - if probe_media_valid(&assembled_audio) { - return Some(assembled_audio); - } - warn!("Assembled audio file is not valid, skipping audio"); - let _ = std::fs::remove_file(&assembled_audio); - } - Err(e) => { - warn!("Failed to assemble audio segments: {e}"); - } - } - } - } - - None - }); - - if let Some(ref audio_file) = resolved_audio { - let merge_result = merge_video_audio(&video_only_path, audio_file, output_path); - let _ = std::fs::remove_file(&video_only_path); - if audio_file != audio_path.unwrap_or(audio_file.as_path()) { - let _ = std::fs::remove_file(audio_file); - } - merge_result.map_err(|e| anyhow::anyhow!("Audio/video merge failed: {e}"))?; - } else if audio_path.is_some() { - if video_only_path != output_path { - std::fs::rename(&video_only_path, output_path)?; - } - warn!("Audio file missing or invalid, output will be video-only"); - } - - info!(output = %output_path.display(), "Canonical MP4 assembled"); - Ok(()) -} - -fn read_segment_paths_from_manifest( - manifest_path: &std::path::Path, - base_dir: &std::path::Path, -) -> anyhow::Result> { - let manifest_text = std::fs::read_to_string(manifest_path)?; - let manifest: serde_json::Value = serde_json::from_str(&manifest_text)?; - - let segments = manifest - .get("segments") - .and_then(|v| v.as_array()) - .ok_or_else(|| anyhow::anyhow!("Invalid manifest: no segments array"))?; - - let mut paths = Vec::new(); - for seg in segments { - let is_complete = seg - .get("is_complete") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - if !is_complete { - continue; - } - if let Some(path_str) = seg.get("path").and_then(|v| v.as_str()) { - let full_path = base_dir.join(path_str); - if full_path.exists() { - paths.push(full_path); - } - } - } - - Ok(paths) -} - -fn discover_segment_files(segments_dir: &std::path::Path) -> anyhow::Result> { - let mut segments: Vec = std::fs::read_dir(segments_dir)? - .filter_map(|entry| { - let entry = entry.ok()?; - let path = entry.path(); - if path.extension().is_some_and(|ext| ext == "m4s") { - Some(path) - } else { - None - } - }) - .collect(); - - segments.sort(); - Ok(segments) -} - async fn create_pipeline( content_dir: PathBuf, screen_capture: crate::sources::screen_capture::VideoSourceConfig, @@ -543,7 +354,7 @@ async fn create_pipeline( .await?; let has_audio = mic_feed.is_some() || system_audio_source.is_some(); - let (audio, audio_path) = if has_audio { + let audio = if has_audio { let audio_dir = content_dir.join("audio"); let mut builder = output_pipeline::OutputPipeline::builder(audio_dir.clone()).with_timestamps(start_time); @@ -570,9 +381,9 @@ async fn create_pipeline( .await .context("audio pipeline setup")?; - (Some(audio_pipeline), Some(audio_dir)) + Some(audio_pipeline) } else { - (None, None) + None }; let segment_rx = segment_channel.map(|(_, rx)| rx); @@ -587,7 +398,6 @@ async fn create_pipeline( screen_info.fps(), ), segments_dir, - audio_path, segment_rx, }) } @@ -738,7 +548,6 @@ pub async fn spawn_instant_recording_actor( audio: None, video_info, segments_dir: content_dir.clone(), - audio_path: None, segment_rx: None, }, video_info, diff --git a/crates/recording/src/output_pipeline/core.rs b/crates/recording/src/output_pipeline/core.rs index 83e1b2120b..03170d033a 100644 --- a/crates/recording/src/output_pipeline/core.rs +++ b/crates/recording/src/output_pipeline/core.rs @@ -1193,9 +1193,15 @@ fn spawn_video_encoder, TVideo: V stop_token.cancelled().await; - if let Err(e) = video_source.stop().await { - error!("Video source stop failed: {e:#}"); - }; + match tokio::time::timeout(Duration::from_secs(5), video_source.stop()).await { + Ok(Err(e)) => { + error!("Video source stop failed: {e:#}"); + } + Err(_) => { + error!("Video source stop timed out after 5s, proceeding with shutdown"); + } + Ok(Ok(())) => {} + } Ok(()) } @@ -1284,50 +1290,66 @@ fn spawn_video_encoder, TVideo: V info!("mux-video cancelled, draining remaining frames from channel"); let drain_start = std::time::Instant::now(); let drain_timeout = Duration::from_secs(2); + let drain_deadline = tokio::time::Instant::now() + drain_timeout; let max_drain_frames = 500u64; let mut drained = 0u64; let mut skipped = 0u64; let mut hit_limit = false; - while let Some(frame) = video_rx.next().await { - frame_count += 1; - - if drain_start.elapsed() > drain_timeout || drained >= max_drain_frames { + loop { + if drained >= max_drain_frames { hit_limit = true; break; } - drained += 1; - - let timestamp = frame.timestamp(); + match tokio::time::timeout_at(drain_deadline, video_rx.next()).await { + Ok(Some(frame)) => { + frame_count += 1; + drained += 1; - if let Some(first_tx) = first_tx.take() { - let _ = first_tx.send(timestamp); - } + let timestamp = frame.timestamp(); - let raw_duration = match anomaly_tracker.process_timestamp(timestamp, timestamps) { - Ok(d) => d, - Err(_) => { - warn!("Timestamp anomaly during drain, skipping frame"); - skipped += 1; - continue; - } - }; + if let Some(first_tx) = first_tx.take() { + let _ = first_tx.send(timestamp); + } - if anomaly_tracker.take_resync_flag() { - drift_tracker.reset_baseline(); - } + let raw_duration = + match anomaly_tracker.process_timestamp(timestamp, timestamps) { + Ok(d) => d, + Err(_) => { + warn!("Timestamp anomaly during drain, skipping frame"); + skipped += 1; + continue; + } + }; - let raw_wall_clock = timestamps.instant().elapsed(); - let total_pause = shared_pause.total_pause_duration(); - let wall_clock_elapsed = raw_wall_clock.saturating_sub(total_pause); - let duration = drift_tracker.calculate_timestamp(raw_duration, wall_clock_elapsed); + if anomaly_tracker.take_resync_flag() { + drift_tracker.reset_baseline(); + } - match muxer.lock().await.send_video_frame(frame, duration) { - Ok(()) => {} - Err(e) => { - warn!("Error processing drained frame: {e}"); - skipped += 1; + let raw_wall_clock = timestamps.instant().elapsed(); + let total_pause = shared_pause.total_pause_duration(); + let wall_clock_elapsed = raw_wall_clock.saturating_sub(total_pause); + let duration = + drift_tracker.calculate_timestamp(raw_duration, wall_clock_elapsed); + + match muxer.lock().await.send_video_frame(frame, duration) { + Ok(()) => {} + Err(e) => { + warn!("Error processing drained frame: {e}"); + skipped += 1; + } + } + } + Ok(None) => break, + Err(_) => { + hit_limit = true; + warn!( + "mux-video drain timed out after {:?}, closing channel", + drain_start.elapsed() + ); + video_rx.close(); + break; } } } @@ -1686,11 +1708,39 @@ impl OutputPipeline { pub async fn stop(mut self) -> anyhow::Result { drop(self.stop_token.take()); - self.done_fut.await?; + const PIPELINE_STOP_TIMEOUT: Duration = Duration::from_secs(10); + match tokio::time::timeout(PIPELINE_STOP_TIMEOUT, self.done_fut.clone()).await { + Ok(res) => res?, + Err(_) => { + return Err(anyhow!( + "Pipeline stop timed out after {}s — tasks may still be running", + PIPELINE_STOP_TIMEOUT.as_secs() + )); + } + } + + let first_timestamp = match tokio::time::timeout( + Duration::from_secs(1), + self.first_timestamp_rx, + ) + .await + { + Ok(Ok(ts)) => ts, + Ok(Err(_)) => { + warn!( + "first_timestamp channel was dropped without sending a value, defaulting to now" + ); + Timestamp::Instant(Instant::now()) + } + Err(_) => { + warn!("first_timestamp receive timed out after 1s, defaulting to now"); + Timestamp::Instant(Instant::now()) + } + }; Ok(FinishedOutputPipeline { path: self.path, - first_timestamp: self.first_timestamp_rx.await?, + first_timestamp, video_info: self.video_info, video_frame_count: self.video_frame_count.load(Ordering::Acquire), }) diff --git a/crates/recording/src/output_pipeline/macos_fragmented_m4s.rs b/crates/recording/src/output_pipeline/macos_fragmented_m4s.rs index d20ba13196..d0c2307b98 100644 --- a/crates/recording/src/output_pipeline/macos_fragmented_m4s.rs +++ b/crates/recording/src/output_pipeline/macos_fragmented_m4s.rs @@ -217,10 +217,22 @@ impl Muxer for MacOSFragmentedM4SMuxer { } fn stop(&mut self) { - if let Some(state) = &self.state - && let Err(e) = state.video_tx.send(None) - { - trace!("M4S encoder channel already closed during stop: {e}"); + if let Some(state) = &self.state { + if state.video_tx.try_send(None).is_ok() { + return; + } + for _ in 0..5 { + std::thread::sleep(Duration::from_millis(50)); + match state.video_tx.try_send(None) { + Ok(()) => return, + Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { + trace!("M4S encoder channel closed during stop retry"); + return; + } + Err(std::sync::mpsc::TrySendError::Full(_)) => {} + } + } + warn!("M4S encoder channel still full after retries, finish() will deliver sentinel"); } } @@ -679,10 +691,24 @@ impl Muxer for MacOSFragmentedM4SCameraMuxer { } fn stop(&mut self) { - if let Some(state) = &self.state - && let Err(e) = state.video_tx.send(None) - { - trace!("M4S camera encoder channel already closed during stop: {e}"); + if let Some(state) = &self.state { + if state.video_tx.try_send(None).is_ok() { + return; + } + for _ in 0..5 { + std::thread::sleep(Duration::from_millis(50)); + match state.video_tx.try_send(None) { + Ok(()) => return, + Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { + trace!("M4S camera encoder channel closed during stop retry"); + return; + } + Err(std::sync::mpsc::TrySendError::Full(_)) => {} + } + } + warn!( + "M4S camera encoder channel still full after retries, finish() will deliver sentinel" + ); } } diff --git a/crates/recording/src/sources/screen_capture/macos.rs b/crates/recording/src/sources/screen_capture/macos.rs index 488343e12a..6f0b8e56d8 100644 --- a/crates/recording/src/sources/screen_capture/macos.rs +++ b/crates/recording/src/sources/screen_capture/macos.rs @@ -729,11 +729,21 @@ impl output_pipeline::VideoSource for VideoSource { "Capturer stopping after creating {} video frames", self.video_frame_counter.load(atomic::Ordering::Relaxed) ); - self.capturer.stop().await?; self.cancel_token.cancel(); - Ok(()) + let stop_result = + tokio::time::timeout(std::time::Duration::from_secs(5), self.capturer.stop()).await; + + match stop_result { + Ok(result) => result, + Err(_) => { + error!("Screen capturer stop timed out after 5s"); + Err(anyhow::anyhow!( + "Screen capturer stop timed out after 5s — native resources may not be fully released" + )) + } + } } .boxed() } diff --git a/crates/recording/tests/hardware_instant_recording.rs b/crates/recording/tests/hardware_instant_recording.rs index 8eae5bd509..daec15ce39 100644 --- a/crates/recording/tests/hardware_instant_recording.rs +++ b/crates/recording/tests/hardware_instant_recording.rs @@ -1,4 +1,7 @@ -use cap_enc_ffmpeg::remux::{get_media_duration, probe_media_valid, probe_video_can_decode}; +use cap_enc_ffmpeg::remux::{ + concatenate_m4s_segments_with_init, get_media_duration, merge_video_audio, + probe_m4s_can_decode_with_init, probe_media_valid, probe_video_can_decode, +}; use cap_recording::{ SendableShareableContent, feeds::microphone::MicrophoneFeed, instant_recording, sources::screen_capture::ScreenCaptureTarget, @@ -78,7 +81,7 @@ async fn instant_record_with_real_mic_and_screen() { let temp = TempDir::new().unwrap(); let recording_dir = temp.path().join("test_recording.cap"); - let recording_seconds = 3; + let recording_seconds = 15; eprintln!("Starting {recording_seconds}s instant recording..."); let mut builder = instant_recording::Actor::builder( @@ -91,7 +94,7 @@ async fn instant_record_with_real_mic_and_screen() { builder = builder.with_mic_feed(mic); } - let mut actor_handle = builder + let actor_handle = builder .build(Some(shareable_content)) .await .expect("Failed to spawn instant recording actor"); @@ -216,68 +219,195 @@ async fn instant_record_with_real_mic_and_screen() { } } - let output_path = content_dir.join("output.mp4"); + let segments_dir = content_dir.join("display"); + let init_path = segments_dir.join("init.mp4"); + assert!( + init_path.exists(), + "init.mp4 should exist in segments dir after stop" + ); + + let mut segment_files: Vec<_> = std::fs::read_dir(&segments_dir) + .expect("should read segments dir") + .filter_map(|e| { + let path = e.ok()?.path(); + if path.extension().is_some_and(|ext| ext == "m4s") { + Some(path) + } else { + None + } + }) + .collect(); + segment_files.sort(); + assert!( + !segment_files.is_empty(), + "at least one .m4s segment should exist after recording" + ); + eprintln!(" Video segment files on disk: {}", segment_files.len()); + + let total_segment_size: u64 = segment_files + .iter() + .filter_map(|p| std::fs::metadata(p).ok()) + .map(|m| m.len()) + .sum(); + eprintln!(" Total video segment size: {} bytes", total_segment_size); + assert!( + total_segment_size > 1000, + "segments should have substantial data, got {total_segment_size} bytes" + ); + + assert!( + probe_media_valid(&init_path), + "init.mp4 should have a valid container" + ); + + eprintln!("\n--- Segment decode verification ---"); + let first_decode = probe_m4s_can_decode_with_init(&init_path, &segment_files[0]); assert!( - output_path.exists(), - "output.mp4 should exist after stop (assembled locally)" + first_decode.as_ref().copied().unwrap_or(false), + "First video segment should be decodable with init: {first_decode:?}" ); + eprintln!(" First segment decodable: OK"); + + let last_segment = segment_files.last().unwrap(); + if last_segment != &segment_files[0] { + let last_decode = probe_m4s_can_decode_with_init(&init_path, last_segment); + assert!( + last_decode.as_ref().copied().unwrap_or(false), + "Last video segment should be decodable with init: {last_decode:?}" + ); + eprintln!(" Last segment decodable: OK"); + } - let output_size = std::fs::metadata(&output_path).unwrap().len(); - eprintln!("output.mp4 size: {} bytes", output_size); + eprintln!("\n--- Full video assembly & playback verification ---"); + let assembled_video = content_dir.join("test_assembled_video.mp4"); + concatenate_m4s_segments_with_init(&init_path, &segment_files, &assembled_video) + .expect("Video segment concatenation should succeed"); + assert!(assembled_video.exists(), "Assembled video MP4 should exist"); + let video_size = std::fs::metadata(&assembled_video).unwrap().len(); + eprintln!(" Assembled video size: {} bytes", video_size); assert!( - output_size > 1000, - "output.mp4 should have substantial data, got {output_size} bytes" + video_size > 1000, + "Assembled video should have substantial data" ); assert!( - probe_media_valid(&output_path), - "output.mp4 should have a valid container" + probe_media_valid(&assembled_video), + "Assembled video should be a valid container" ); assert!( - probe_video_can_decode(&output_path).unwrap_or(false), - "output.mp4 video stream should be decodable" + probe_video_can_decode(&assembled_video).unwrap_or(false), + "Assembled video should be decodable" ); + eprintln!(" Assembled video: valid container, decodable"); - let duration = get_media_duration(&output_path); - assert!(duration.is_some(), "should be able to read output duration"); - let dur_secs = duration.unwrap().as_secs_f64(); - eprintln!("output.mp4 duration: {dur_secs:.2}s (expected ~{recording_seconds}s)"); + let video_duration = get_media_duration(&assembled_video); assert!( - dur_secs > (recording_seconds as f64) * 0.5, - "duration ({dur_secs:.2}s) should be at least 50% of recording time ({recording_seconds}s)" + video_duration.is_some(), + "Should be able to read assembled video duration" ); + let video_dur_secs = video_duration.unwrap().as_secs_f64(); + eprintln!(" Video duration: {video_dur_secs:.2}s (expected ~{recording_seconds}s)"); assert!( - dur_secs < (recording_seconds as f64) * 2.0, - "duration ({dur_secs:.2}s) should be less than 2x recording time ({recording_seconds}s)" + video_dur_secs > (recording_seconds as f64) * 0.5, + "Video duration ({video_dur_secs:.2}s) should be at least 50% of recording time ({recording_seconds}s)" + ); + assert!( + video_dur_secs < (recording_seconds as f64) * 2.0, + "Video duration ({video_dur_secs:.2}s) should be less than 2x recording time ({recording_seconds}s)" ); - let input = - ffmpeg::format::input(&output_path).expect("should open output.mp4 for stream probing"); - let has_video = input + let input_ctx = + ffmpeg::format::input(&assembled_video).expect("Should open assembled video for probing"); + let has_video = input_ctx .streams() .any(|s| s.parameters().medium() == ffmpeg::media::Type::Video); - let has_audio = input - .streams() - .any(|s| s.parameters().medium() == ffmpeg::media::Type::Audio); + assert!(has_video, "Assembled video must contain a video stream"); - assert!(has_video, "output.mp4 must contain a video stream"); if has_mic { + eprintln!("\n--- Audio assembly & A/V sync verification ---"); + let audio_dir = content_dir.join("audio"); + let audio_init = audio_dir.join("init.mp4"); + assert!( + audio_init.exists(), + "Audio init.mp4 should exist when mic is connected" + ); + + let mut audio_segments: Vec<_> = std::fs::read_dir(&audio_dir) + .expect("should read audio dir") + .filter_map(|e| { + let path = e.ok()?.path(); + if path.extension().is_some_and(|ext| ext == "m4s") { + Some(path) + } else { + None + } + }) + .collect(); + audio_segments.sort(); + eprintln!(" Audio segment files: {}", audio_segments.len()); + + let assembled_audio = content_dir.join("test_assembled_audio.m4a"); + concatenate_m4s_segments_with_init(&audio_init, &audio_segments, &assembled_audio) + .expect("Audio segment concatenation should succeed"); + assert!( + probe_media_valid(&assembled_audio), + "Assembled audio should be a valid container" + ); + + let audio_duration = get_media_duration(&assembled_audio); assert!( - has_audio, - "output.mp4 must contain an audio stream when mic was connected" + audio_duration.is_some(), + "Should be able to read assembled audio duration" ); + let audio_dur_secs = audio_duration.unwrap().as_secs_f64(); + eprintln!(" Audio duration: {audio_dur_secs:.2}s (expected ~{recording_seconds}s)"); + assert!( + audio_dur_secs > (recording_seconds as f64) * 0.5, + "Audio duration ({audio_dur_secs:.2}s) should be at least 50% of recording time" + ); + + let av_drift = (video_dur_secs - audio_dur_secs).abs(); + eprintln!(" A/V duration drift: {av_drift:.3}s"); + assert!( + av_drift < 1.0, + "A/V drift ({av_drift:.3}s) should be less than 1 second" + ); + + let merged_output = content_dir.join("test_merged_av.mp4"); + merge_video_audio(&assembled_video, &assembled_audio, &merged_output) + .expect("Video + audio merge should succeed"); + assert!( + probe_media_valid(&merged_output), + "Merged A/V file should be a valid container" + ); + assert!( + probe_video_can_decode(&merged_output).unwrap_or(false), + "Merged A/V file should be decodable" + ); + + let merged_ctx = + ffmpeg::format::input(&merged_output).expect("Should open merged file for probing"); + let has_merged_video = merged_ctx + .streams() + .any(|s| s.parameters().medium() == ffmpeg::media::Type::Video); + let has_merged_audio = merged_ctx + .streams() + .any(|s| s.parameters().medium() == ffmpeg::media::Type::Audio); + assert!(has_merged_video, "Merged file must have video stream"); + assert!(has_merged_audio, "Merged file must have audio stream"); + eprintln!(" Merged A/V: valid, decodable, both streams present"); + eprintln!(" A/V sync: PASS (drift {av_drift:.3}s < 1.0s)"); } - eprintln!("Streams: video={has_video}, audio={has_audio}"); match &completed.health { cap_recording::RecordingHealth::Healthy => { - eprintln!("Recording health: HEALTHY"); + eprintln!("\nRecording health: HEALTHY"); } cap_recording::RecordingHealth::Repaired { original_issue } => { - eprintln!("Recording health: REPAIRED (was: {original_issue})"); + eprintln!("\nRecording health: REPAIRED (was: {original_issue})"); } cap_recording::RecordingHealth::Degraded { issues } => { - eprintln!("Recording health: DEGRADED - {issues:?}"); + eprintln!("\nRecording health: DEGRADED - {issues:?}"); } cap_recording::RecordingHealth::Damaged { reason } => { panic!("Recording health is DAMAGED: {reason}"); @@ -295,10 +425,8 @@ async fn instant_record_with_real_mic_and_screen() { .map(|(name, _, _)| name) .unwrap_or_else(|| "none".to_string()) ); - eprintln!(" Duration: {dur_secs:.2}s"); - eprintln!(" Video segments: {}", complete_segments.len()); - eprintln!(" Output size: {} bytes", output_size); - eprintln!(" Has video: {has_video}"); - eprintln!(" Has audio: {has_audio}"); + eprintln!(" Video duration: {video_dur_secs:.2}s"); + eprintln!(" Video segments: {}", segment_files.len()); + eprintln!(" Total segment size: {} bytes", total_segment_size); eprintln!(" Health: {:?}", completed.health); }