Skip to content

add asp commands#2001

Draft
nickpoindexter wants to merge 1 commit into
mongodb:mainfrom
nickpoindexter:add_asp_commands
Draft

add asp commands#2001
nickpoindexter wants to merge 1 commit into
mongodb:mainfrom
nickpoindexter:add_asp_commands

Conversation

@nickpoindexter

Copy link
Copy Markdown

Summary

Adds first-class driver support for Atlas Stream Processing (ASP), implementing the ASP driver spec. Users currently have to drop down to
client.GetDatabase("admin").RunCommand(...) against a workspace endpoint; this PR adds a dedicated client / handles layer matching what's already shipped for PHP, Rust, and Ruby.

What's new

Public API (src/MongoDB.Driver/StreamProcessing/)

  • StreamProcessingClient — workspace-scoped client distinct from MongoClient. Validates the workspace URI (atlas-stream-*.<region>.a.query.mongodb.net and the .mongodb-<env>.net staging variants), enforces TLS, defaults authSource to
    admin. Implements IDisposable and disposes the underlying MongoClient. Static StreamProcessingClient.IsWorkspaceUri(uri) exposed for callers that want to gate before connecting.
  • StreamProcessorsCreate / Get / GetInfo (plus Async pairs) for managing processors in a workspace.
  • StreamProcessorStart / Stop / Drop / Stats / Samples (plus Async pairs) for a named processor.
  • StreamProcessorInfo — typed accessor over the getStreamProcessor response. Fields the spec marks as optional (e.g. Id, PipelineVersion) are Nullable<T> / nullable reference types.
  • StreamProcessorSamples — result of Samples / SamplesAsync, exposes CursorId, Documents, IsExhausted.
  • Options: CreateStreamProcessorOptions, StartStreamProcessorOptions, FailoverOptions, GetStreamProcessorStatsOptions, GetStreamProcessorSamplesOptions. POCO classes following the existing options-class convention.

Wire commands
All 8 ASP commands routed through IMongoDatabase.RunCommand / RunCommandAsync against the admin database: createStreamProcessor, startStreamProcessor, stopStreamProcessor, dropStreamProcessor, getStreamProcessor,
getStreamProcessorStats, startSampleStreamProcessor, getMoreSampleStreamProcessor.

Notable spec / server alignment

  • startAfter (in StartStreamProcessorOptions) is intentionally not exposed — the spec marks it RESERVED for future use and explicitly forbids drivers from sending it.
  • Dev-server response shape deviations are accommodated, matching the workarounds in the PHP / Rust / Ruby PRs:
    • StreamProcessors.GetInfo unwraps a top-level result wrapper when the server returns { ok: 1, result: { … } }.
    • StreamProcessor.Samples accepts both nextBatch (spec) and messages (current dev server).
    • StreamProcessorInfo.Id and PipelineVersion return null when the server omits them.
  • Samples is a single-call dispatch: null/zero CursorIdstartSampleStreamProcessor (returns StreamProcessorSamples with cursor id, empty docs); non-zero → getMoreSampleStreamProcessor. Callers stop when CursorId == 0 (or use
    the IsExhausted predicate).
  • State strings returned as plain strings, not enums — per the spec, drivers MUST surface unknown state values as-is rather than mapping to a closed set.
  • Internal-only fields (tenantID, projectId, processorId) are not surfaced in the public API.

Test plan

  • dotnet build CSharpDriver.sln — 0 errors (14 pre-existing warnings: SharpCompress vulnerability + netcoreapp3.1 TFM support notices, neither introduced by this PR)
  • dotnet test tests/MongoDB.Driver.Tests/MongoDB.Driver.Tests.csproj -f net10.0 --filter "FullyQualifiedName~StreamProcessing" — all unit tests pass (URI detection for prod / staging / creds-and-port / case-insensitivity / four reject
    cases; constructor rejection paths; full StreamProcessorInfo getter coverage including defaults and error fields; StreamProcessorSamples predicates)
  • dotnet test ... --filter "FullyQualifiedName~StreamProcessingLifecycle" (no env var) — self-skips cleanly
  • Full lifecycle exercised against an Atlas staging workspace via tests/MongoDB.Driver.Examples/StreamProcessingExample.cs — create → start → stats → samples (cursor opened + telemetry docs returned in second call) → stop → drop. All
    transitions clean.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant