diff --git a/aws/client/aws-client-awsjson/src/main/java/software/amazon/smithy/java/aws/client/awsjson/AwsJsonProtocol.java b/aws/client/aws-client-awsjson/src/main/java/software/amazon/smithy/java/aws/client/awsjson/AwsJsonProtocol.java index 55abd30feb..6ab5259f4e 100644 --- a/aws/client/aws-client-awsjson/src/main/java/software/amazon/smithy/java/aws/client/awsjson/AwsJsonProtocol.java +++ b/aws/client/aws-client-awsjson/src/main/java/software/amazon/smithy/java/aws/client/awsjson/AwsJsonProtocol.java @@ -141,6 +141,6 @@ private EventDecoderFactory getEventDecoderFactory(ApiOperation subscriber) { + dataStream.subscribe(subscriber); + } +} diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientReplayableByteBufferPublisher.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientReplayableByteBufferPublisher.java deleted file mode 100644 index 53fcbb299e..0000000000 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientReplayableByteBufferPublisher.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.client.http; - -import java.net.http.HttpRequest; -import java.nio.ByteBuffer; -import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Fast path {@link HttpRequest.BodyPublisher} for replayable in-memory {@link ByteBuffer} bodies. - * - *

Publishes the body in a single {@code onNext} without forcing a {@code byte[]} copy or an - * extra publisher wrapper. - */ -final class JavaHttpClientReplayableByteBufferPublisher implements HttpRequest.BodyPublisher { - private final ByteBuffer body; - - JavaHttpClientReplayableByteBufferPublisher(ByteBuffer body) { - this.body = body.asReadOnlyBuffer(); - } - - @Override - public long contentLength() { - return body.remaining(); - } - - @Override - public void subscribe(Flow.Subscriber subscriber) { - subscriber.onSubscribe(new Flow.Subscription() { - private final AtomicBoolean completed = new AtomicBoolean(); - - @Override - public void request(long n) { - if (n <= 0 || !completed.compareAndSet(false, true)) { - return; - } else { - subscriber.onNext(body.duplicate()); - subscriber.onComplete(); - } - } - - @Override - public void cancel() { - completed.set(true); - } - }); - } -} diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientSmallBodySubscriber.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientSmallBodySubscriber.java deleted file mode 100644 index 0867f8a683..0000000000 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientSmallBodySubscriber.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.client.http; - -import java.net.http.HttpHeaders; -import java.net.http.HttpResponse; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Flow; -import software.amazon.smithy.java.io.datastream.DataStream; - -/** - * Small-body subscriber for known-length responses that fit in memory cheaply. - * - *

Allocates one array up front and completes with a replayable {@link DataStream}. - */ -final class JavaHttpClientSmallBodySubscriber implements HttpResponse.BodySubscriber { - private final String contentType; - private final byte[] bytes; - private int position; - private final CompletableFuture body = new CompletableFuture<>(); - - JavaHttpClientSmallBodySubscriber(HttpHeaders headers, int contentLength) { - this.contentType = headers.firstValue("content-type").orElse(null); - this.bytes = new byte[Math.max(contentLength, 0)]; - } - - @Override - public CompletionStage getBody() { - return body; - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - // This path is only used for known small bodies, so the full response is already - // bounded by the fast-path threshold and can be aggregated safely in memory. - subscription.request(Long.MAX_VALUE); - } - - @Override - public void onNext(List item) { - for (ByteBuffer buffer : item) { - int remaining = buffer.remaining(); - buffer.get(bytes, position, remaining); - position += remaining; - } - } - - @Override - public void onError(Throwable throwable) { - body.completeExceptionally(throwable); - } - - @Override - public void onComplete() { - byte[] result = position == bytes.length ? bytes : Arrays.copyOf(bytes, position); - body.complete(DataStream.ofBytes(result, contentType)); - } -} diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java index 2a696b3ec7..dad5661833 100644 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java @@ -5,8 +5,6 @@ package software.amazon.smithy.java.client.http; -import static java.net.http.HttpRequest.BodyPublisher; -import static java.net.http.HttpRequest.BodyPublishers; import static java.net.http.HttpResponse.BodyHandler; import static java.net.http.HttpResponse.BodySubscriber; import static java.net.http.HttpResponse.BodySubscribers; @@ -14,7 +12,6 @@ import static software.amazon.smithy.java.http.api.HttpHeaders.HeaderWithValueConsumer; import java.io.IOException; -import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpConnectTimeoutException; import java.nio.ByteBuffer; @@ -44,13 +41,12 @@ */ public final class JavaHttpClientTransport implements ClientTransport { - private static final URI DUMMY_URI = URI.create("http://localhost"); - - private static final int SMALL_RESPONSE_BODY_FAST_PATH_THRESHOLD = 64 * 1024; + private static final int SMALL_RESPONSE_BODY_FAST_PATH_THRESHOLD = 1024 * 256; // 256 KB + // Drop content-length private static final HeaderWithValueConsumer VALUE_CONSUMER = (b, n, v) -> { if (n != HeaderName.CONTENT_LENGTH.name()) { - b.setHeader(n, v); + b.header(n, v); } }; @@ -61,7 +57,8 @@ public final class JavaHttpClientTransport implements ClientTransportKnown-empty bodies use {@link BodyPublishers#noBody()}, replayable in-memory - * {@link ByteBuffer} bodies use {@link JavaHttpClientReplayableByteBufferPublisher}, and - * everything else falls back to {@link DataStream#bodyPublisher()}. - * - *

Request timeout precedence: context value takes priority over the transport-level - * {@link #defaultRequestTimeout}; if neither is set the JDK applies no timeout. */ private java.net.http.HttpRequest createJavaRequest(Context context, HttpRequest request) { - DataStream requestBody = request.body(); - BodyPublisher bodyPublisher; - ByteBuffer replayableRequestBody = toReplayableBodyBuffer(requestBody); - - if (replayableRequestBody != null) { - bodyPublisher = !replayableRequestBody.hasRemaining() - ? BodyPublishers.noBody() - : new JavaHttpClientReplayableByteBufferPublisher(replayableRequestBody); - } else if (requestBody.hasKnownLength()) { - bodyPublisher = requestBody.contentLength() == 0 - ? BodyPublishers.noBody() - : BodyPublishers.fromPublisher(requestBody, requestBody.contentLength()); - } else { - bodyPublisher = BodyPublishers.fromPublisher(requestBody); - } - var httpRequestBuilder = java.net.http.HttpRequest.newBuilder() .version(smithyToHttpVersion(request.httpVersion())) - .method(request.method(), bodyPublisher) + .method(request.method(), new DataStreamBodyPublisher(request.body())) .uri(request.uri().toURI()); - Duration requestTimeout = context.get(HttpContext.HTTP_REQUEST_TIMEOUT); - if (requestTimeout == null) { - requestTimeout = defaultRequestTimeout; - } + Duration requestTimeout = context.getOrDefault(HttpContext.HTTP_REQUEST_TIMEOUT, defaultRequestTimeout); if (requestTimeout != null) { httpRequestBuilder.timeout(requestTimeout); } @@ -199,26 +158,10 @@ private java.net.http.HttpRequest createJavaRequest(Context context, HttpRequest return httpRequestBuilder.build(); } - /** - * Return the body as a {@link ByteBuffer} when it can be published directly; otherwise - * return {@code null}. - */ - private static ByteBuffer toReplayableBodyBuffer(DataStream requestBody) { - if (!requestBody.isReplayable() || !requestBody.hasKnownLength() || !requestBody.hasByteBuffer()) { - return null; - } - - try { - return requestBody.asByteBuffer().asReadOnlyBuffer(); - } catch (UnsupportedOperationException | IllegalStateException e) { - return null; - } - } - private HttpResponse sendRequest(java.net.http.HttpRequest request) { java.net.http.HttpResponse res = null; try { - res = client.send(request, new ResponseBodyHandler()); + res = client.send(request, ResponseBodyHandler.INSTANCE); return createSmithyResponse(res); } catch (IOException | InterruptedException | RuntimeException e) { if (res != null) { @@ -241,15 +184,13 @@ private HttpResponse sendRequest(java.net.http.HttpRequest request) { // package-private for testing HttpResponse createSmithyResponse(java.net.http.HttpResponse response) { + LOGGER.trace("Got response: {}", response); var headers = new JavaHttpHeaders(response.headers()); - LOGGER.trace("Got response: {}; headers: {}", response, response.headers().map()); - var length = headers.contentLength(); var adaptedLength = length == null ? -1 : length; var contentType = headers.contentType(); var body = DataStream.withMetadata(response.body(), contentType, adaptedLength, response.body().isReplayable()); - - return new JavaHttpResponse(javaToSmithyVersion(response.version()), response.statusCode(), headers, body); + return HttpResponse.of(javaToSmithyVersion(response.version()), response.statusCode(), headers, body); } private static HttpClient.Version smithyToHttpVersion(HttpVersion version) { @@ -313,23 +254,22 @@ public MessageExchange messageExchange() { /** * Picks a {@link BodySubscriber} implementation based on the advertised response size. * - *

Small-body fast path ({@link JavaHttpClientSmallBodySubscriber}): when - * {@code Content-Length} is present and within {@link #SMALL_RESPONSE_BODY_FAST_PATH_THRESHOLD} - * bytes, we pre-size a {@code byte[]} and accumulate the response fully before handing it - * back. The result is a replayable {@link DataStream} with known length, avoiding the producer/consumer hand-off - * of the streaming path. - * - *

If the fast path is not taken, the transport falls back to the JDK's built-in - * {@code ofInputStream()} body subscriber and wraps the returned stream as a Smithy - * {@link DataStream}. + *

Small-body fast path ({@link ZeroCopyBodySubscriber}): when {@code Content-Length} is present and within + * {@link #SMALL_RESPONSE_BODY_FAST_PATH_THRESHOLD}. Otherwise, falls back to the JDK's built-in + * {@code ofInputStream()} body subscriber. */ private static final class ResponseBodyHandler implements BodyHandler { + static final ResponseBodyHandler INSTANCE = new ResponseBodyHandler(); + @Override public BodySubscriber apply(ResponseInfo responseInfo) { long contentLength = responseInfo.headers().firstValueAsLong("content-length").orElse(-1L); - if (contentLength >= 0 && contentLength <= SMALL_RESPONSE_BODY_FAST_PATH_THRESHOLD) { - return new JavaHttpClientSmallBodySubscriber(responseInfo.headers(), (int) contentLength); + if (contentLength == 0) { + String contentType = responseInfo.headers().firstValue("content-type").orElse(null); + return BodySubscribers.replacing(DataStream.ofBytes(new byte[0], contentType)); + } else if (contentLength >= 0 && contentLength <= SMALL_RESPONSE_BODY_FAST_PATH_THRESHOLD) { + return new ZeroCopyBodySubscriber(responseInfo.headers(), contentLength); } String contentType = responseInfo.headers().firstValue("content-type").orElse(null); diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpHeaders.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpHeaders.java index 0e40d4cea0..64f822b700 100644 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpHeaders.java +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpHeaders.java @@ -5,18 +5,20 @@ package software.amazon.smithy.java.client.http; -import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import software.amazon.smithy.java.http.api.HeaderName; import software.amazon.smithy.java.http.api.HttpHeaders; +/** + * Adapter that exposes a JDK {@link java.net.http.HttpHeaders} as a smithy-java {@link HttpHeaders}. + */ final class JavaHttpHeaders implements HttpHeaders { private final java.net.http.HttpHeaders headers; private volatile Map> materialized; - private volatile int size = -1; + private int size = -1; // cheap enough to recompute JavaHttpHeaders(java.net.http.HttpHeaders headers) { this.headers = headers; @@ -24,12 +26,12 @@ final class JavaHttpHeaders implements HttpHeaders { @Override public List allValues(String name) { - return allValuesCanonical(HeaderName.canonicalize(name)); + return headers.allValues(name); } @Override public List allValues(HeaderName name) { - return allValuesCanonical(name.name()); + return headers.allValues(name.name()); } @Override @@ -40,56 +42,62 @@ public int size() { } result = 0; - for (List values : headers.map().values()) { - result += values.size(); + for (var entry : headers.map().entrySet()) { + String k = entry.getKey(); + // pseudo-headers are excluded + if (k.charAt(0) != ':') { + result += entry.getValue().size(); + } } + size = result; return result; } @Override public Map> map() { - Map> result = materialized; - if (result != null) { - return result; - } + return materialize(); + } - Map> grouped = new LinkedHashMap<>(); - for (var entry : headers.map().entrySet()) { - String canonical = HeaderName.canonicalize(entry.getKey()); - if (canonical.equals(":status")) { - continue; - } - List values = grouped.get(canonical); - if (values == null) { - values = new ArrayList<>(entry.getValue().size()); - grouped.put(canonical, values); + @Override + public void forEachEntry(BiConsumer consumer) { + for (var entry : materialize().entrySet()) { + for (var value : entry.getValue()) { + consumer.accept(entry.getKey(), value); } - values.addAll(entry.getValue()); } - materialized = grouped; - return grouped; } - private List allValuesCanonical(String canonical) { - Map> cached = materialized; - if (cached != null) { - return cached.getOrDefault(canonical, Collections.emptyList()); + @Override + public void forEachEntry(C contextValue, HeaderWithValueConsumer consumer) { + for (var entry : materialize().entrySet()) { + for (var value : entry.getValue()) { + consumer.accept(contextValue, entry.getKey(), value); + } } + } - if (canonical.equals(":status")) { - return Collections.emptyList(); + /** + * Lazy single-shot canonicalization of the JDK headers. The JDK guarantees one entry per + * case-insensitive name (its internal map uses {@code String.CASE_INSENSITIVE_ORDER}), so + * we just lowercase each key and reuse the JDK's already-immutable value list by reference. + */ + private Map> materialize() { + var result = materialized; + if (result != null) { + return result; } - List values = null; + var grouped = new LinkedHashMap>(headers.map().size()); for (var entry : headers.map().entrySet()) { - if (HeaderName.canonicalize(entry.getKey()).equals(canonical)) { - if (values == null) { - values = new ArrayList<>(entry.getValue().size()); - } - values.addAll(entry.getValue()); + String key = entry.getKey(); + // pseudo-headers are excluded + if (key.charAt(0) != ':') { + grouped.put(HeaderName.canonicalize(key), entry.getValue()); } } - return values == null ? Collections.emptyList() : values; + + materialized = grouped; + return grouped; } } diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpResponse.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpResponse.java deleted file mode 100644 index 9a48150095..0000000000 --- a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpResponse.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.client.http; - -import software.amazon.smithy.java.http.api.HttpHeaders; -import software.amazon.smithy.java.http.api.HttpResponse; -import software.amazon.smithy.java.http.api.HttpVersion; -import software.amazon.smithy.java.http.api.ModifiableHttpResponse; -import software.amazon.smithy.java.io.datastream.DataStream; - -record JavaHttpResponse( - HttpVersion httpVersion, - int statusCode, - HttpHeaders headers, - DataStream body) implements HttpResponse { - - @Override - public HttpResponse toUnmodifiable() { - return this; - } - - @Override - public ModifiableHttpResponse toModifiable() { - return toModifiableCopy(); - } - - @Override - public ModifiableHttpResponse toModifiableCopy() { - return HttpResponse.create() - .setHttpVersion(httpVersion) - .setStatusCode(statusCode) - .setHeaders(headers.toModifiable()) - .setBody(body); - } -} diff --git a/client/client-http/src/main/java/software/amazon/smithy/java/client/http/ZeroCopyBodySubscriber.java b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/ZeroCopyBodySubscriber.java new file mode 100644 index 0000000000..97318f53dd --- /dev/null +++ b/client/client-http/src/main/java/software/amazon/smithy/java/client/http/ZeroCopyBodySubscriber.java @@ -0,0 +1,66 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http; + +import java.net.http.HttpHeaders; +import java.net.http.HttpResponse; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import software.amazon.smithy.java.io.datastream.DataStream; + +/** + * Zero-copy body subscriber for known-length responses that fit in memory. + * + *

Per the {@link HttpResponse.BodySubscriber} contract: "both the Lists and the ByteBuffers, once passed to + * the subscriber, are no longer used by the HTTP Client." That means we can hold direct references to the chunks + * the JDK delivers and hand them to downstream code without copying. The completed {@link DataStream} keeps the chunks + * intact and only stitches into a contiguous buffer if a caller specifically asks for one via {@code asByteBuffer()}. + */ +final class ZeroCopyBodySubscriber implements HttpResponse.BodySubscriber { + private final String contentType; + private final long contentLength; + private final List chunks; + private final CompletableFuture body = new CompletableFuture<>(); + + ZeroCopyBodySubscriber(HttpHeaders headers, long contentLength) { + this.contentType = headers.firstValue("content-type").orElse(null); + this.contentLength = contentLength; + // Most responses arrive in 1–4 chunks for our small-body threshold; size the + // backing array accordingly to skip the default-capacity grow-and-copy. + this.chunks = new ArrayList<>(4); + } + + @Override + public CompletionStage getBody() { + return body; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + // Only used for known-length bodies bounded by the small-body threshold, so we can + // safely drain everything without backpressure. + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(List item) { + chunks.addAll(item); + } + + @Override + public void onError(Throwable throwable) { + body.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + body.complete(DataStream.ofByteBuffers(chunks, contentLength, contentType)); + } +} diff --git a/client/client-http/src/test/java/software/amazon/smithy/java/client/http/JavaHttpClientTest.java b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/JavaHttpClientTest.java index dbe11e7532..36f30234aa 100644 --- a/client/client-http/src/test/java/software/amazon/smithy/java/client/http/JavaHttpClientTest.java +++ b/client/client-http/src/test/java/software/amazon/smithy/java/client/http/JavaHttpClientTest.java @@ -14,7 +14,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.io.InputStream; import java.net.Authenticator; import java.net.CookieHandler; import java.net.ProxySelector; @@ -168,7 +167,6 @@ public URI uri() { var transport = new JavaHttpClientTransport(); var response = transport.createSmithyResponse(fakeResponse); - assertInstanceOf(JavaHttpResponse.class, response); assertInstanceOf(JavaHttpHeaders.class, response.headers()); assertFalse(response.headers().map().containsKey(":status"), "Response headers should not contain :status pseudo-header"); @@ -176,7 +174,10 @@ public URI uri() { } @Test - public void usesPublisherFastPathForKnownLengthBodies() { + public void publishesBodyViaSubscribeForKnownLengthBodies() { + // The transport delegates body publication to DataStream.subscribe() through + // DataStreamBodyPublisher; per-DataStream-impl subscribe overrides (e.g., + // ByteBufferDataStream's zero-copy single-onNext) carry the optimization. byte[] payload = new byte[128 * 1024]; Arrays.fill(payload, (byte) 'a'); var client = new CapturingHttpClient(); @@ -184,47 +185,7 @@ public void usesPublisherFastPathForKnownLengthBodies() { var request = software.amazon.smithy.java.http.api.HttpRequest.create() .setUri(URI.create("http://localhost/test")) .setMethod("POST") - .setBody(new DataStream() { - @Override - public long contentLength() { - return payload.length; - } - - @Override - public String contentType() { - return null; - } - - @Override - public boolean isReplayable() { - return true; - } - - @Override - public boolean isAvailable() { - return true; - } - - @Override - public InputStream asInputStream() { - throw new AssertionError("asInputStream should not be called"); - } - - @Override - public ByteBuffer asByteBuffer() { - return ByteBuffer.wrap(payload); - } - - @Override - public boolean hasByteBuffer() { - return true; - } - - @Override - public void subscribe(Flow.Subscriber subscriber) { - throw new AssertionError("subscribe should not be called"); - } - }) + .setBody(DataStream.ofBytes(payload)) .toUnmodifiable(); try (var response = transport.send(Context.create(), request)) { diff --git a/client/client-rpcv2/src/main/java/software/amazon/smithy/java/client/rpcv2/AbstractRpcV2ClientProtocol.java b/client/client-rpcv2/src/main/java/software/amazon/smithy/java/client/rpcv2/AbstractRpcV2ClientProtocol.java index fa322f05ce..e4f33e5c1b 100644 --- a/client/client-rpcv2/src/main/java/software/amazon/smithy/java/client/rpcv2/AbstractRpcV2ClientProtocol.java +++ b/client/client-rpcv2/src/main/java/software/amazon/smithy/java/client/rpcv2/AbstractRpcV2ClientProtocol.java @@ -155,7 +155,7 @@ public O deserializ private static DataStream bodyDataStream(HttpResponse response) { var contentType = response.headers().contentType(); var contentLength = response.headers().contentLength(); - return DataStream.withMetadata(response.body(), contentType, contentLength, null); + return DataStream.withMetadata(response.body(), contentType, contentLength == null ? -1 : contentLength); } private DataStream getBody(SerializableStruct input) { diff --git a/codecs/json-codec/src/main/java/software/amazon/smithy/java/json/jackson/JacksonJsonSerdeProvider.java b/codecs/json-codec/src/main/java/software/amazon/smithy/java/json/jackson/JacksonJsonSerdeProvider.java index a05d179b88..b219b7d0f6 100644 --- a/codecs/json-codec/src/main/java/software/amazon/smithy/java/json/jackson/JacksonJsonSerdeProvider.java +++ b/codecs/json-codec/src/main/java/software/amazon/smithy/java/json/jackson/JacksonJsonSerdeProvider.java @@ -66,11 +66,22 @@ public ShapeDeserializer newDeserializer( @Override public ShapeDeserializer newDeserializer(ByteBuffer source, JsonSettings settings) { - int offset = source.arrayOffset() + source.position(); - int length = source.remaining(); var ctx = readCtx(settings); try { - return new JacksonJsonDeserializer(FACTORY.createParser(ctx, source.array(), offset, length), settings); + byte[] array; + int offset; + int length = source.remaining(); + if (source.hasArray()) { + array = source.array(); + offset = source.arrayOffset() + source.position(); + } else { + // Direct or read-only buffer: copy via duplicate() so we don't disturb the caller's position. + var dup = source.duplicate(); + array = new byte[length]; + dup.get(array); + offset = 0; + } + return new JacksonJsonDeserializer(FACTORY.createParser(ctx, array, offset, length), settings); } catch (JacksonException e) { throw new SerializationException(e); } diff --git a/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HeaderName.java b/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HeaderName.java index 73a1bddbc6..561996e88a 100644 --- a/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HeaderName.java +++ b/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HeaderName.java @@ -90,6 +90,7 @@ public final class HeaderName { public static final HeaderName AMZ_SDK_REQUEST = HeaderName.builtin("amz-sdk-request"); public static final HeaderName X_AMZN_REQUESTID = HeaderName.builtin("x-amzn-requestid"); public static final HeaderName X_AMZN_TRACE_ID = HeaderName.builtin("x-amzn-trace-id"); + public static final HeaderName X_AMZ_CONTENT_SHA256 = HeaderName.builtin("x-amz-content-sha256"); public static final HeaderName X_AMZ_DATE = HeaderName.builtin("x-amz-date"); public static final HeaderName X_AMZ_REQUEST_ID = HeaderName.builtin("x-amz-request-id"); public static final HeaderName X_AMZ_SECURITY_TOKEN = HeaderName.builtin("x-amz-security-token"); diff --git a/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HttpHeaders.java b/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HttpHeaders.java index cadeedfd68..611baa7589 100644 --- a/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HttpHeaders.java +++ b/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HttpHeaders.java @@ -207,7 +207,7 @@ default ModifiableHttpHeaders toModifiable() { } else { ModifiableHttpHeaders copy = new ArrayHttpHeaders(size()); for (var e : map().entrySet()) { - copy.addHeader(e.getKey(), e.getValue()); + copy.setHeader(e.getKey(), e.getValue()); } return copy; } diff --git a/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HttpResponse.java b/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HttpResponse.java index 0c0b57d85b..2753853c9f 100644 --- a/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HttpResponse.java +++ b/http/http-api/src/main/java/software/amazon/smithy/java/http/api/HttpResponse.java @@ -5,6 +5,8 @@ package software.amazon.smithy.java.http.api; +import software.amazon.smithy.java.io.datastream.DataStream; + /** * HTTP response. */ @@ -45,4 +47,17 @@ public interface HttpResponse extends HttpMessage { static ModifiableHttpResponse create() { return new ModifiableHttpResponseImpl(); } + + /** + * Create an unmodifiable {@link HttpResponse} from its four fields directly. + * + * @param httpVersion the HTTP version of the response. + * @param statusCode the response status code. + * @param headers the response headers (must already be unmodifiable). + * @param body the response body. + * @return the constructed unmodifiable response. + */ + static HttpResponse of(HttpVersion httpVersion, int statusCode, HttpHeaders headers, DataStream body) { + return new HttpResponseImpl(httpVersion, statusCode, headers, body); + } } diff --git a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseDeserializer.java b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseDeserializer.java index 2bb33b331c..c599af0046 100644 --- a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseDeserializer.java +++ b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseDeserializer.java @@ -67,7 +67,7 @@ public ResponseDeserializer response(HttpResponse response) { private DataStream bodyDataStream(HttpResponse response) { var contentType = response.headers().contentType(); var contentLength = response.headers().contentLength(); - return DataStream.withMetadata(response.body(), contentType, contentLength, null); + return DataStream.withMetadata(response.body(), contentType, contentLength == null ? -1 : contentLength); } /** diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java index df38f5b27d..9b809cccc8 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/ByteBufferDataStream.java @@ -8,9 +8,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.http.HttpRequest; import java.nio.ByteBuffer; import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; import software.amazon.smithy.java.io.ByteBufferUtils; final class ByteBufferDataStream implements DataStream { @@ -20,9 +20,6 @@ final class ByteBufferDataStream implements DataStream { private final long contentLength; ByteBufferDataStream(ByteBuffer buffer, String contentType) { - if (!buffer.hasArray()) { - throw new IllegalArgumentException("Only ByteBuffers with an accessible byte array are supported"); - } this.buffer = buffer; this.contentLength = buffer.remaining(); this.contentType = contentType; @@ -57,7 +54,14 @@ public InputStream asInputStream() { @Override public void writeTo(OutputStream out) throws IOException { - out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + if (buffer.hasArray()) { + out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } else { + var dup = buffer.duplicate(); + var tmp = new byte[dup.remaining()]; + dup.get(tmp); + out.write(tmp); + } } @Override @@ -77,8 +81,34 @@ public boolean hasKnownLength() { @Override public void subscribe(Flow.Subscriber subscriber) { - HttpRequest.BodyPublishers - .ofByteArray(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()) - .subscribe(subscriber); + if (contentLength == 0) { + EmptyDataStream.PUBLISHER.subscribe(subscriber); + } else { + subscriber.onSubscribe(new Subscription(subscriber)); + } + } + + // Zero-copy byte-buffer subscription. + private final class Subscription implements Flow.Subscription { + private final AtomicBoolean completed = new AtomicBoolean(); + private final Flow.Subscriber subscriber; + + Subscription(Flow.Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + if (n <= 0 || !completed.compareAndSet(false, true)) { + return; + } + subscriber.onNext(buffer.duplicate()); + subscriber.onComplete(); + } + + @Override + public void cancel() { + completed.set(true); + } } } diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java index b309203622..b0cc5d5f04 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java @@ -14,7 +14,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Objects; +import java.util.List; import java.util.concurrent.Flow; /** @@ -257,6 +257,26 @@ static DataStream ofBytes(byte[] bytes, int offset, int length, String contentTy return new ByteBufferDataStream(ByteBuffer.wrap(bytes, offset, length), contentType); } + /** + * Create a DataStream from a list of in-memory {@link ByteBuffer}s. + * + *

The buffers are not stitched into a single buffer until {@link #asByteBuffer()} is + * called, so consumers that subscribe (or write directly to an output stream) avoid the + * concatenation copy. + * + * @param buffers buffers to expose in order. + * @param contentLength sum of all actual data in each buffer. + * @param contentType content-type of the data, if known. + * @return the created DataStream. + */ + static DataStream ofByteBuffers(List buffers, long contentLength, String contentType) { + return switch (buffers.size()) { + case 0 -> EmptyDataStream.INSTANCE; + case 1 -> ofByteBuffer(buffers.getFirst(), contentType); + default -> new MultiByteBufferDataStream(buffers, contentLength, contentType); + }; + } + /** * Create a DataStream from a ByteBuffer. * @@ -343,37 +363,51 @@ static DataStream ofPublisher( /** * Creates DataStream that returns potentially more specific metadata about the stream. * - *

This might be necessary if the payload of a request is streaming, but an HTTP response gave a Content-Length. + *

Useful when, e.g., the request body publisher reports unknown length but the HTTP + * response carries an authoritative Content-Length header that should override it. + * + *

Returns the input {@code ds} unchanged when none of the requested overrides differ + * from what {@code ds} already reports. * * @param ds The DataStream to wrap, if necessary. - * @param contentType Content-Type to associate with the stream. Can be null to not attempt to alter it. - * @param contentLength Content length of the stream. Can be null to not attempt to alter it. - * @param isReplayable True if the publisher can be replayed. Can be null to not attempt to alter it. - * @return the wrapped DataStream. + * @param contentType Content-Type to associate. Pass null to inherit from {@code ds}. + * @param contentLength Content-Length to associate. Use {@code -1} to inherit from {@code ds}, + * otherwise {@code 0} or greater for the byte length. + * @return the wrapped DataStream, or {@code ds} unchanged if no override changes anything. */ - static DataStream withMetadata(DataStream ds, String contentType, Long contentLength, Boolean isReplayable) { - boolean isChanged = false; - var changedContentType = ds.contentType(); - var changedContentLength = ds.contentLength(); - var changedIsReplayable = ds.isReplayable(); - - if (contentType != null && !Objects.equals(contentType, ds.contentType())) { - isChanged = true; - changedContentType = contentType; - } + static DataStream withMetadata(DataStream ds, String contentType, long contentLength) { + return withMetadata(ds, contentType, contentLength, ds.isReplayable()); + } - if (contentLength != null && !Objects.equals(contentLength, ds.contentLength())) { - isChanged = true; - changedContentLength = contentLength; - } + /** + * Creates a DataStream that exposes potentially-different metadata than the underlying stream. + * + *

Useful when, e.g., the request body publisher reports unknown length but the HTTP + * response carries an authoritative Content-Length header that should override it. + * + *

Returns the input {@code ds} unchanged when none of the requested overrides differ + * from what {@code ds} already reports. + * + * @param ds The DataStream to wrap, if necessary. + * @param contentType Content-Type to associate. Pass null to inherit from {@code ds}. + * @param contentLength Content-Length to associate. Use {@code -1} to inherit from {@code ds}, + * otherwise {@code 0} or greater for the byte length. + * @param isReplayable Replayability to associate. + * @return the wrapped DataStream, or {@code ds} unchanged if no override changes anything. + */ + static DataStream withMetadata(DataStream ds, String contentType, long contentLength, boolean isReplayable) { + boolean changeContentType = contentType != null && !contentType.equals(ds.contentType()); + boolean changeContentLength = contentLength >= 0 && contentLength != ds.contentLength(); + boolean changeReplayable = isReplayable != ds.isReplayable(); - if (isReplayable != null && !Objects.equals(isReplayable, ds.isReplayable())) { - isChanged = true; - changedIsReplayable = isReplayable; + if (!changeContentType && !changeContentLength && !changeReplayable) { + return ds; } - return isChanged - ? new WrappedDataStream(ds, changedContentLength, changedContentType, changedIsReplayable) - : ds; + return new WrappedDataStream( + ds, + changeContentLength ? contentLength : ds.contentLength(), + changeContentType ? contentType : ds.contentType(), + isReplayable); } } diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java index 08b2135dcc..3e88b96934 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/EmptyDataStream.java @@ -7,15 +7,39 @@ import java.io.InputStream; import java.io.OutputStream; -import java.net.http.HttpRequest; import java.nio.ByteBuffer; import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; final class EmptyDataStream implements DataStream { + /** + * Minimal empty publisher. + */ + static final Flow.Publisher PUBLISHER = subscriber -> { + subscriber.onSubscribe(new Flow.Subscription() { + private final AtomicBoolean done = new AtomicBoolean(); + + @Override + public void request(long n) { + if (!done.compareAndSet(false, true)) { + return; + } + if (n <= 0) { + subscriber.onError(new IllegalArgumentException("non-positive subscription request")); + } else { + subscriber.onComplete(); + } + } + + @Override + public void cancel() { + done.set(true); + } + }); + }; static final EmptyDataStream INSTANCE = new EmptyDataStream(); private static final byte[] EMPTY_BYTES = new byte[0]; - private static final Flow.Publisher PUBLISHER = HttpRequest.BodyPublishers.noBody(); @Override public ByteBuffer asByteBuffer() { diff --git a/io/src/main/java/software/amazon/smithy/java/io/datastream/MultiByteBufferDataStream.java b/io/src/main/java/software/amazon/smithy/java/io/datastream/MultiByteBufferDataStream.java new file mode 100644 index 0000000000..7ffdc8d54f --- /dev/null +++ b/io/src/main/java/software/amazon/smithy/java/io/datastream/MultiByteBufferDataStream.java @@ -0,0 +1,168 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.io.datastream; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.SequenceInputStream; +import java.nio.ByteBuffer; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import software.amazon.smithy.java.io.ByteBufferUtils; + +/** + * Replayable DataStream backed by two or more {@link ByteBuffer}s. + * + *

Useful for HTTP response handlers that aggregate chunks delivered by the wire without stitching them into a + * single contiguous buffer. {@link #asByteBuffer()} stitches lazily and caches the result; {@link #subscribe} emits + * the chunks downstream with no copies. + * + *

This class assumes its caller has already short-circuited the empty and single-buffer cases through + * {@link DataStream#ofByteBuffers(List, long, String)} (which routes those to {@link EmptyDataStream} and + * {@link ByteBufferDataStream} respectively). + * + *

Ownership: the supplied list is taken by reference, not defensively copied. Callers MUST NOT mutate the list or + * the buffers' positions/limits after handing them off. The HTTP subscriber path satisfies this trivially: the JDK + * BodySubscriber contract guarantees the JDK will not touch buffers passed to {@code onNext} again, and the subscriber + * itself drops its only reference to the list when it completes. + */ +final class MultiByteBufferDataStream implements DataStream { + + private final List buffers; + private final long contentLength; + private final String contentType; + private volatile ByteBuffer stitched; + + MultiByteBufferDataStream(List buffers, long contentLength, String contentType) { + if (buffers.size() < 2) { + throw new IllegalArgumentException("MultiByteBufferDataStream requires >= 2 buffers; " + + "use DataStream.ofByteBuffers for the general factory."); + } + this.buffers = buffers; + this.contentLength = contentLength; + this.contentType = contentType; + } + + @Override + public boolean isReplayable() { + return true; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public boolean hasKnownLength() { + return true; + } + + @Override + public long contentLength() { + return contentLength; + } + + @Override + public String contentType() { + return contentType; + } + + @Override + public boolean hasByteBuffer() { + // The contiguous buffer is materialized lazily by asByteBuffer(). + return false; + } + + @Override + public ByteBuffer asByteBuffer() { + var local = stitched; + if (local != null) { + return local.duplicate(); + } + // First call materializes a single contiguous buffer and caches it for subsequent + // calls. We size the destination from the known total content length to avoid any + // growth/resize. + var total = ByteBuffer.allocate((int) contentLength); + for (var buf : buffers) { + total.put(buf.duplicate()); + } + total.flip(); + stitched = total; + return total.duplicate(); + } + + @Override + public InputStream asInputStream() { + // Build a SequenceInputStream over per-buffer ByteArrayInputStreams. We chain + // wrappers rather than aggregating once because `asInputStream` is allowed to be + // a streaming view over the underlying data. + var iter = buffers.iterator(); + return new SequenceInputStream(new Enumeration<>() { + @Override + public boolean hasMoreElements() { + return iter.hasNext(); + } + + @Override + public InputStream nextElement() { + var buf = iter.next().duplicate(); + if (buf.hasArray()) { + return new ByteArrayInputStream(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining()); + } + return ByteBufferUtils.byteBufferInputStream(buf); + } + }); + } + + @Override + public void writeTo(OutputStream out) throws IOException { + for (var buf : buffers) { + var dup = buf.duplicate(); + if (dup.hasArray()) { + out.write(dup.array(), dup.arrayOffset() + dup.position(), dup.remaining()); + } else { + var tmp = new byte[dup.remaining()]; + dup.get(tmp); + out.write(tmp); + } + } + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + subscriber.onSubscribe(new Subscription(subscriber)); + } + + private final class Subscription implements Flow.Subscription { + private final AtomicBoolean done = new AtomicBoolean(); + private final Flow.Subscriber subscriber; + + Subscription(Flow.Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + if (n <= 0 || !done.compareAndSet(false, true)) { + return; + } + for (var buf : buffers) { + subscriber.onNext(buf.duplicate()); + } + subscriber.onComplete(); + } + + @Override + public void cancel() { + done.set(true); + } + } +} diff --git a/io/src/main/java/software/amazon/smithy/java/io/uri/URLEncoding.java b/io/src/main/java/software/amazon/smithy/java/io/uri/URLEncoding.java index a64523c081..eba2b2bb4f 100644 --- a/io/src/main/java/software/amazon/smithy/java/io/uri/URLEncoding.java +++ b/io/src/main/java/software/amazon/smithy/java/io/uri/URLEncoding.java @@ -42,16 +42,16 @@ private URLEncoding() {} } /** - * Returns true iff {@code c} is an RFC 3986 unreserved ASCII character. + * Returns {@code true} iff {@code c} is an RFC 3986 unreserved ASCII character + * (alphanumeric, '-', '.', '_', or '~'). Non-ASCII codepoints always return + * {@code false}. */ - private static boolean isUnreserved(int c) { + public static boolean isUnreserved(int c) { if (c >= 128) { return false; } - return c < 64 - ? ((UNRESERVED_LOW >>> c) & 1L) != 0L - : ((UNRESERVED_HIGH >>> (c - 64)) & 1L) != 0L; + return c < 64 ? ((UNRESERVED_LOW >>> c) & 1L) != 0L : ((UNRESERVED_HIGH >>> (c - 64)) & 1L) != 0L; } /** diff --git a/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java b/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java index 1d7044365b..2c4e262725 100644 --- a/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java +++ b/io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java @@ -30,7 +30,7 @@ public void returnsByteBuffer() { @Test public void delegatesIsAvailableToUnderlyingStream() { var ds = DataStream.ofInputStream(new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8))); - var wrapped = DataStream.withMetadata(ds, "text/plain", 3L, null); + var wrapped = DataStream.withMetadata(ds, "text/plain", 3L); assertThat(wrapped.isAvailable(), is(true)); ds.asInputStream();