This repository was archived by the owner on Jan 16, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathDataIngesterService.java
More file actions
106 lines (93 loc) · 3.51 KB
/
DataIngesterService.java
File metadata and controls
106 lines (93 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package com.wavefront.sdk.direct.ingestion;
import com.google.common.io.ByteStreams;
import com.wavefront.sdk.common.clients.WavefrontClientFactory;
import com.wavefront.sdk.common.clients.service.ReportingService;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
/**
* DataIngester service that reports entities to Wavefront
*
* This class will be removed in future versions in favor of
* {@link com.wavefront.sdk.common.clients.WavefrontClientFactory} to construct Proxy and DirectDataIngestion senders.
*
* @author Sushant Dewan (sushant@wavefront.com).
* @version $Id: $Id
*/
@Deprecated
public class DataIngesterService implements DataIngesterAPI {
private static final Logger log = Logger.getLogger(ReportingService.class.getCanonicalName());
private final String token;
private final URI uri;
private static final String BAD_REQUEST = "Bad client request";
private static final int CONNECT_TIMEOUT_MILLIS = 30000;
private static final int READ_TIMEOUT_MILLIS = 10000;
private static final int NO_HTTP_RESPONSE = -1;
DataIngesterService(String server, String token) {
this.token = token;
this.uri = URI.create(server);
}
/** {@inheritDoc} */
@Override
public int report(String format, InputStream stream) {
/*
* Refer https://docs.oracle.com/javase/8/docs/technotes/guides/net/http-keepalive.html
* for details around why this code is written as it is.
*/
int statusCode = 400;
HttpURLConnection urlConn = null;
try {
String originalPath = uri.getPath() != null ? uri.getPath() : "";
URL url = new URL(uri.getScheme(), uri.getHost(), uri.getPort(), originalPath + "/report?f=" + format);
urlConn = (HttpURLConnection) url.openConnection();
urlConn.setDoOutput(true);
urlConn.addRequestProperty("Content-Type", "application/octet-stream");
urlConn.addRequestProperty("Content-Encoding", "gzip");
urlConn.addRequestProperty("Authorization", "Bearer " + token);
urlConn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
urlConn.setReadTimeout(READ_TIMEOUT_MILLIS);
try (GZIPOutputStream gzipOS = new GZIPOutputStream(urlConn.getOutputStream())) {
ByteStreams.copy(stream, gzipOS);
gzipOS.flush();
}
statusCode = urlConn.getResponseCode();
readAndClose(urlConn.getInputStream());
} catch (IOException ex) {
if (urlConn != null) {
return safeGetResponseCodeAndClose(urlConn);
}
}
return statusCode;
}
private int safeGetResponseCodeAndClose(HttpURLConnection urlConn) {
int statusCode;
try {
statusCode = urlConn.getResponseCode();
} catch (IOException ex) {
log.log(Level.SEVERE, "Unable to obtain status code from the Wavefront service at "
+ urlConn.getURL().toString(), ex);
statusCode = NO_HTTP_RESPONSE;
}
try {
readAndClose(urlConn.getErrorStream());
} catch (IOException ex) {
log.log(Level.SEVERE, "Unable to read and close error stream from the Wavefront service at "
+ urlConn.getURL().toString(), ex);
}
return statusCode;
}
private void readAndClose(InputStream stream) throws IOException {
if (stream != null) {
try (InputStream is = stream) {
byte[] buffer = new byte[4096];
// read entire stream before closing
while (is.read(buffer) > 0) {}
}
}
}
}