Skip to content

Commit 65f8834

Browse files
authored
Fix potential data loss when using direct ingestion (#76)
* Fix the thread blocking issue * Add points back to buffer when sending points fails
1 parent 79f0640 commit 65f8834

4 files changed

Lines changed: 116 additions & 39 deletions

File tree

open_source_license.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
open_source_license.txt
22

3-
Wavefront by VMware SDK for Python 1.7.5 GA
3+
Wavefront by VMware SDK for Python 1.7.6 GA
44

55
======================================================================
66

@@ -291,4 +291,4 @@ Source Files is valid for three years from the date you acquired or last used th
291291
Software product. Alternatively, the Source Files may accompany the
292292
VMware service.
293293

294-
[WAVEFRONTSDKPYTHON15GANT070819]
294+
[WAVEFRONTSDKPYTHON176GANT070819]

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
setuptools.setup(
1717
name='wavefront-sdk-python',
18-
version='1.7.5',
18+
version='1.7.6',
1919
author='Wavefront by VMware',
2020
author_email='chitimba@wavefront.com',
2121
url='https://github.com/wavefrontHQ/wavefront-sdk-python',

wavefront_sdk/common/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,6 @@
5656

5757
# Distribution name for sdk
5858
WAVEFRONT_SDK_PYTHON = 'wavefront-sdk-python'
59+
60+
# Default http status code for sending points
61+
NO_HTTP_RESPONSE = -1

wavefront_sdk/direct.py

Lines changed: 110 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ def _report(self, points, data_format, entity_prefix, report_errors):
185185
@param data_format: Type of data to be sent
186186
@type data_format: str
187187
"""
188+
status_code = constants.NO_HTTP_RESPONSE
188189
try:
189190
if data_format == self.WAVEFRONT_EVENT_FORMAT:
190191
response = requests.post(self.server + self.EVENT_END_POINT,
@@ -198,23 +199,29 @@ def _report(self, points, data_format, entity_prefix, report_errors):
198199
params=params,
199200
headers=self._headers,
200201
data=compressed_data)
201-
202-
self._sdk_metrics_registry.new_delta_counter(
203-
'{}.report.{}'.format(entity_prefix,
204-
response.status_code)).inc()
205-
response.raise_for_status()
206-
except Exception as error:
202+
status_code = response.status_code
203+
self._sdk_metrics_registry.new_delta_counter('{}.report.{}'.format(
204+
entity_prefix, status_code)).inc()
205+
except requests.exceptions.RequestException:
207206
report_errors.inc()
208-
raise error
207+
return status_code
209208

210209
def _batch_report(self, batch_line_data, data_format, entity_prefix,
211-
report_errors):
210+
report_errors, data_buffer, dropped_point_counter):
212211
"""One api call sending one given list of data.
213212
214213
@param batch_line_data: List of data to be sent
215214
@type batch_line_data: list
216215
@param data_format: Type of data to be sent
217216
@type data_format: str
217+
@param entity_prefix: Metric prefix of data type
218+
@type entity_prefix: str
219+
@param report_errors: Counter of errors
220+
@type report_errors: WavefrontSdkCounter
221+
@param data_buffer: Data buffer to be flush and sent
222+
@type data_buffer: Queue
223+
@param dropped_point_counter: Counter of dropped points
224+
@type dropped_point_counter: WavefrontSdkCounter
218225
"""
219226
# Sending events through direct ingestion does not support batching.
220227
batch_size = int(
@@ -223,31 +230,91 @@ def _batch_report(self, batch_line_data, data_format, entity_prefix,
223230
# Split data into chunks, each with the size of given batch_size
224231
for batch in utils.chunks(batch_line_data, batch_size):
225232
# report once per batch
233+
status_code = self._report('\n'.join(batch) + '\n', data_format,
234+
entity_prefix, report_errors)
235+
if 400 <= status_code <= 599 or status_code == -1:
236+
if status_code == 401:
237+
logging.error(
238+
'Failed to report %s data points to wavefront '
239+
'(HTTP %d). Please verify that your API Token is '
240+
'correct! All %s data points are discarded. ',
241+
data_format, status_code, data_format)
242+
dropped_point_counter.inc(len(batch))
243+
elif status_code == 403:
244+
if data_format == self.WAVEFRONT_METRIC_FORMAT:
245+
logging.error(
246+
'Failed to report %s data points to wavefront '
247+
'(HTTP %d). Please verify that Direct Data '
248+
'Ingestion is enabled for your account! '
249+
'All %s data points are discarded. ', data_format,
250+
status_code, data_format)
251+
dropped_point_counter.inc(len(batch))
252+
else:
253+
logging.error(
254+
'Failed to report %s data points to wavefront '
255+
'(HTTP %d). Please verify that Direct Data '
256+
'Ingestion and %s data points are enabled for '
257+
'your account! All %s data points are discarded. ',
258+
data_format, status_code, data_format, data_format)
259+
dropped_point_counter.inc(len(batch))
260+
else:
261+
logging.error(
262+
'Failed to report %s data points to wavefront '
263+
'(HTTP %d). Data will be requeued and resent.',
264+
data_format, status_code)
265+
self._requeue(batch, data_format, data_buffer,
266+
dropped_point_counter)
267+
268+
@staticmethod
269+
def _requeue(points, data_format, data_buffer, dropped_point_counter):
270+
"""Add point data back to buffer queue.
271+
272+
@param points: Point data in line format
273+
@type points: List[str]
274+
@param data_format: Type of data to be sent
275+
@type data_format: str
276+
@param data_buffer: Data buffer to be flush and sent
277+
@type data_buffer: Queue
278+
@param dropped_point_counter: Counter of dropped points
279+
@type dropped_point_counter: WavefrontSdkCounter
280+
"""
281+
added_back_to_buffer_count = 0
282+
for point in points:
226283
try:
227-
self._report('\n'.join(batch) + '\n', data_format,
228-
entity_prefix, report_errors)
229-
# pylint: disable=broad-except,fixme
230-
# TODO: Please replace a generic Exception with a specific one.
231-
except Exception as error:
284+
data_buffer.put_nowait(point)
285+
except queue.Full:
286+
dropped_point_count = len(points) - added_back_to_buffer_count
287+
dropped_point_counter.inc(dropped_point_count)
232288
logging.error(
233-
'Failed to report %s data points to wavefront %s',
234-
data_format, error)
289+
'Buffer full, dropping %d %s data points.'
290+
'Consider increasing the batch size of '
291+
'your sender to increase throughput.', dropped_point_count,
292+
data_format)
293+
break
294+
added_back_to_buffer_count += 1
235295

236296
def _internal_flush(self, data_buffer, data_format, entity_prefix,
237-
report_errors):
297+
report_errors, dropped_point_counter):
238298
"""Get all data from one data buffer to a list, and report that list.
239299
240300
@param data_buffer: Data buffer to be flush and sent
241-
@type: Queue
301+
@type data_buffer: Queue
242302
@param data_format: Type of data to be sent
243-
@type: str
303+
@type data_format: str
304+
@param entity_prefix: Metric prefix of data type
305+
@type entity_prefix: str
306+
@param report_errors: Counter of errors
307+
@type report_errors: WavefrontSdkCounter
308+
@param dropped_point_counter: Counter of dropped points
309+
@type dropped_point_counter: WavefrontSdkCounter
244310
"""
245311
data = []
246312
size = data_buffer.qsize()
247313
while size > 0 and not data_buffer.empty():
248314
data.append(data_buffer.get())
249315
size -= 1
250-
self._batch_report(data, data_format, entity_prefix, report_errors)
316+
self._batch_report(data, data_format, entity_prefix, report_errors,
317+
data_buffer, dropped_point_counter)
251318

252319
def _schedule_timer(self):
253320
# Flush every 5 secs by default
@@ -270,19 +337,21 @@ def flush_now(self):
270337
"""Flush all the data buffer immediately."""
271338
self._internal_flush(self._metrics_buffer,
272339
self.WAVEFRONT_METRIC_FORMAT, 'points',
273-
self._points_report_errors)
340+
self._points_report_errors, self._points_dropped)
274341
self._internal_flush(self._histograms_buffer,
275342
self.WAVEFRONT_HISTOGRAM_FORMAT, 'histograms',
276-
self._histograms_report_errors)
343+
self._histograms_report_errors,
344+
self._histograms_dropped)
277345
self._internal_flush(self._tracing_spans_buffer,
278346
self.WAVEFRONT_TRACING_SPAN_FORMAT, 'spans',
279-
self._spans_report_errors)
347+
self._spans_report_errors, self._spans_dropped)
280348
self._internal_flush(self._spans_log_buffer,
281349
self.WAVEFRONT_SPAN_LOG_FORMAT, 'span_logs',
282-
self._span_logs_report_errors)
283-
self._internal_flush(self._events_buffer,
284-
self.WAVEFRONT_EVENT_FORMAT, 'events',
285-
self._events_report_errors)
350+
self._span_logs_report_errors,
351+
self._span_logs_dropped)
352+
self._internal_flush(self._events_buffer, self.WAVEFRONT_EVENT_FORMAT,
353+
'events', self._events_report_errors,
354+
self._events_dropped)
286355

287356
def close(self):
288357
"""Flush all buffer before close the client."""
@@ -320,7 +389,7 @@ def send_metric(self, name, value, timestamp, source, tags):
320389
self._points_invalid.inc()
321390
raise error
322391
try:
323-
self._metrics_buffer.put(line_data)
392+
self._metrics_buffer.put_nowait(line_data)
324393
except queue.Full as error:
325394
self._points_dropped.inc()
326395
raise error
@@ -335,7 +404,8 @@ def send_metric_now(self, metrics):
335404
@type metrics: list[str]
336405
"""
337406
self._batch_report(metrics, self.WAVEFRONT_METRIC_FORMAT, 'points',
338-
self._points_report_errors)
407+
self._points_report_errors, self._metrics_buffer,
408+
self._points_dropped)
339409

340410
def send_distribution(self, name, centroids, histogram_granularities,
341411
timestamp, source, tags):
@@ -369,7 +439,7 @@ def send_distribution(self, name, centroids, histogram_granularities,
369439
self._histograms_invalid.inc()
370440
raise error
371441
try:
372-
self._histograms_buffer.put(line_data)
442+
self._histograms_buffer.put_nowait(line_data)
373443
except queue.Full as error:
374444
self._histograms_dropped.inc()
375445
raise error
@@ -384,7 +454,8 @@ def send_distribution_now(self, distributions):
384454
@type distributions: list[str]
385455
"""
386456
self._batch_report(distributions, self.WAVEFRONT_HISTOGRAM_FORMAT,
387-
'histograms', self._histograms_report_errors)
457+
'histograms', self._histograms_report_errors,
458+
self._histograms_buffer, self._histograms_dropped)
388459

389460
# pylint: disable=too-many-arguments
390461

@@ -431,7 +502,7 @@ def send_span(self, name, start_millis, duration_millis, source, trace_id,
431502
self._spans_invalid.inc()
432503
raise error
433504
try:
434-
self._tracing_spans_buffer.put(line_data)
505+
self._tracing_spans_buffer.put_nowait(line_data)
435506
except queue.Full as error:
436507
self._spans_dropped.inc()
437508
raise error
@@ -444,7 +515,7 @@ def send_span(self, name, start_millis, duration_millis, source, trace_id,
444515
self._span_logs_invalid.inc()
445516
raise error
446517
try:
447-
self._spans_log_buffer.put(line_data)
518+
self._spans_log_buffer.put_nowait(line_data)
448519
except queue.Full as error:
449520
self._span_logs_dropped.inc()
450521
raise error
@@ -460,7 +531,8 @@ def send_span_now(self, spans):
460531
@type spans: list[str]
461532
"""
462533
self._batch_report(spans, self.WAVEFRONT_TRACING_SPAN_FORMAT, 'spans',
463-
self._spans_report_errors)
534+
self._spans_report_errors,
535+
self._tracing_spans_buffer, self._spans_dropped)
464536

465537
def send_span_log_now(self, span_logs):
466538
"""
@@ -473,7 +545,8 @@ def send_span_log_now(self, span_logs):
473545
@type span_logs: list[str]
474546
"""
475547
self._batch_report(span_logs, self.WAVEFRONT_SPAN_LOG_FORMAT,
476-
'span_logs', self._span_logs_report_errors)
548+
'span_logs', self._span_logs_report_errors,
549+
self._spans_log_buffer, self._span_logs_dropped)
477550

478551
def send_event(self, name, start_time, end_time, source, tags,
479552
annotations):
@@ -510,7 +583,7 @@ def send_event(self, name, start_time, end_time, source, tags,
510583
self._events_invalid.inc()
511584
raise error
512585
try:
513-
self._events_buffer.put(line_data)
586+
self._events_buffer.put_nowait(line_data)
514587
except queue.Full as error:
515588
self._events_dropped.inc()
516589
raise error
@@ -525,7 +598,8 @@ def send_event_now(self, events):
525598
@type events: list[str]
526599
"""
527600
self._batch_report(events, self.WAVEFRONT_EVENT_FORMAT, 'events',
528-
self._events_report_errors)
601+
self._events_report_errors, self._events_buffer,
602+
self._events_dropped)
529603

530604
def get_failure_count(self):
531605
"""Get failure count for one connection."""

0 commit comments

Comments
 (0)