Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Matrix Types:

Parameters:

EVENT_LOOP: 'LIBEV' (Default), 'GEVENT', 'EVENTLET', 'ASYNCIO', 'ASYNCORE', 'TWISTED'
EVENT_LOOP: 'LIBEV' (Default), 'ASYNCIO', 'ASYNCORE'
CYTHON: Default, 'True', 'False'

*/
Expand Down Expand Up @@ -296,8 +296,6 @@ def executeStandardTests() {

failure=0
EVENT_LOOP=${EVENT_LOOP} VERIFY_CYTHON=${CYTHON_ENABLED} JVM_EXTRA_OPTS="$JVM_EXTRA_OPTS -Xss384k" pytest -s -v --log-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --junit-xml=unit_results.xml tests/unit/ || failure=1
EVENT_LOOP_MANAGER=eventlet VERIFY_CYTHON=${CYTHON_ENABLED} JVM_EXTRA_OPTS="$JVM_EXTRA_OPTS -Xss384k" pytest -s -v --log-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --junit-xml=unit_eventlet_results.xml tests/unit/io/test_eventletreactor.py || failure=1
EVENT_LOOP_MANAGER=gevent VERIFY_CYTHON=${CYTHON_ENABLED} JVM_EXTRA_OPTS="$JVM_EXTRA_OPTS -Xss384k" pytest -s -v --log-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --junit-xml=unit_gevent_results.xml tests/unit/io/test_geventreactor.py || failure=1
exit $failure
'''
} catch (err) {
Expand Down Expand Up @@ -688,7 +686,7 @@ pipeline {
</table>''')
choice(
name: 'EVENT_LOOP',
choices: ['LIBEV', 'GEVENT', 'EVENTLET', 'ASYNCIO', 'ASYNCORE', 'TWISTED'],
choices: ['LIBEV', 'ASYNCIO', 'ASYNCORE'],
description: '''<p>Event loop manager to utilize for scheduled or adhoc builds</p>
<table style="width:100%">
<col width="25%">
Expand All @@ -701,14 +699,6 @@ pipeline {
<td><strong>LIBEV</strong></td>
<td>A full-featured and high-performance event loop that is loosely modeled after libevent, but without its limitations and bugs</td>
</tr>
<tr>
<td><strong>GEVENT</strong></td>
<td>A co-routine -based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev or libuv event loop</td>
</tr>
<tr>
<td><strong>EVENTLET</strong></td>
<td>A concurrent networking library for Python that allows you to change how you run your code, not how you write it</td>
</tr>
<tr>
<td><strong>ASYNCIO</strong></td>
<td>A library to write concurrent code using the async/await syntax</td>
Expand All @@ -717,10 +707,6 @@ pipeline {
<td><strong>ASYNCORE</strong></td>
<td>A module provides the basic infrastructure for writing asynchronous socket service clients and servers</td>
</tr>
<tr>
<td><strong>TWISTED</strong></td>
<td>An event-driven networking engine written in Python and licensed under the open source MIT license</td>
</tr>
</table>''')
choice(
name: 'CI_SCHEDULE',
Expand Down
16 changes: 0 additions & 16 deletions benchmarks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,6 @@
except (ImportError, SyntaxError):
pass

have_twisted = False
try:
from cassandra.io.twistedreactor import TwistedConnection
have_twisted = True
supported_reactors.append(TwistedConnection)
except ImportError as exc:
log.exception("Error importing twisted")
pass

KEYSPACE = "testkeyspace" + str(int(time.time()))
TABLE = "testtable"

Expand Down Expand Up @@ -230,8 +221,6 @@ def parse_options():
help='only benchmark with asyncio connections')
parser.add_option('--libev-only', action='store_true', dest='libev_only',
help='only benchmark with libev connections')
parser.add_option('--twisted-only', action='store_true', dest='twisted_only',
help='only benchmark with Twisted connections')
parser.add_option('-m', '--metrics', action='store_true', dest='enable_metrics',
help='enable and print metrics for operations')
parser.add_option('-l', '--log-level', default='info',
Expand Down Expand Up @@ -271,11 +260,6 @@ def parse_options():
log.error("libev is not available")
sys.exit(1)
options.supported_reactors = [LibevConnection]
elif options.twisted_only:
if not have_twisted:
log.error("Twisted is not available")
sys.exit(1)
options.supported_reactors = [TwistedConnection]
else:
options.supported_reactors = supported_reactors
if not have_libev:
Expand Down
94 changes: 3 additions & 91 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,55 +93,11 @@
from cassandra.datastax.graph.query import _request_timeout_key, _GraphSONContextRowFactory
from cassandra.datastax import cloud as dscloud

try:
from cassandra.io.twistedreactor import TwistedConnection
except ImportError:
TwistedConnection = None

try:
from cassandra.io.eventletreactor import EventletConnection
# PYTHON-1364
#
# At the moment eventlet initialization is chucking AttributeErrors due to its dependence on pyOpenSSL
# and some changes in Python 3.12 which have some knock-on effects there.
except (ImportError, AttributeError):
EventletConnection = None

try:
from weakref import WeakSet
except ImportError:
from cassandra.util import WeakSet # NOQA

def _is_gevent_monkey_patched():
if 'gevent.monkey' not in sys.modules:
return False
import gevent.socket
return socket.socket is gevent.socket.socket

def _try_gevent_import():
if _is_gevent_monkey_patched():
from cassandra.io.geventreactor import GeventConnection
return (GeventConnection,None)
else:
return (None,None)

def _is_eventlet_monkey_patched():
if 'eventlet.patcher' not in sys.modules:
return False
try:
import eventlet.patcher
return eventlet.patcher.is_monkey_patched('socket')
# Another case related to PYTHON-1364
except AttributeError:
return False

def _try_eventlet_import():
if _is_eventlet_monkey_patched():
from cassandra.io.eventletreactor import EventletConnection
return (EventletConnection,None)
else:
return (None,None)

def _try_libev_import():
try:
from cassandra.io.libevreactor import LibevConnection
Expand All @@ -168,7 +124,7 @@ def _connection_reduce_fn(val,import_fn):

log = logging.getLogger(__name__)

conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import)
conn_fns = (_try_libev_import, _try_asyncore_import)
(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[]))
if not conn_class:
raise DependencyException("Unable to load a default connection class", excs)
Expand Down Expand Up @@ -878,19 +834,13 @@ def default_retry_policy(self, policy):

* :class:`cassandra.io.asyncorereactor.AsyncoreConnection`
* :class:`cassandra.io.libevreactor.LibevConnection`
* :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details)
* :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details)
* :class:`cassandra.io.twistedreactor.TwistedConnection`
* EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection`

By default, ``AsyncoreConnection`` will be used, which uses
the ``asyncore`` module in the Python standard library.

If ``libev`` is installed, ``LibevConnection`` will be used instead.

If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding
connection class will be used automatically.

``AsyncioConnection``, which uses the ``asyncio`` module in the Python
standard library, is also available, but currently experimental. Note that
it requires ``asyncio`` features that were only introduced in the 3.4 line
Expand Down Expand Up @@ -1168,9 +1118,7 @@ def __init__(self,
raise ValueError("contact_points, endpoint_factory, ssl_context, and ssl_options "
"cannot be specified with a cloud configuration")

uses_twisted = TwistedConnection and issubclass(self.connection_class, TwistedConnection)
uses_eventlet = EventletConnection and issubclass(self.connection_class, EventletConnection)
cloud_config = dscloud.get_cloud_config(cloud, create_pyopenssl_context=uses_twisted or uses_eventlet)
cloud_config = dscloud.get_cloud_config(cloud)

ssl_context = cloud_config.ssl_context
ssl_options = {'check_hostname': True}
Expand Down Expand Up @@ -1389,7 +1337,7 @@ def __init__(self,
HostDistance.REMOTE: DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST
}

self.executor = self._create_thread_pool_executor(max_workers=executor_threads)
self.executor = ThreadPoolExecutor(max_workers=executor_threads)
self.scheduler = _Scheduler(self.executor)

self._lock = RLock()
Expand All @@ -1411,42 +1359,6 @@ def __init__(self,
if application_version is not None:
self.application_version = application_version

def _create_thread_pool_executor(self, **kwargs):
"""
Create a ThreadPoolExecutor for the cluster. In most cases, the built-in
`concurrent.futures.ThreadPoolExecutor` is used.

Python 3.7+ and Eventlet cause the `concurrent.futures.ThreadPoolExecutor`
to hang indefinitely. In that case, the user needs to have the `futurist`
package so we can use the `futurist.GreenThreadPoolExecutor` class instead.

:param kwargs: All keyword args are passed to the ThreadPoolExecutor constructor.
:return: A ThreadPoolExecutor instance.
"""
tpe_class = ThreadPoolExecutor
if sys.version_info[0] >= 3 and sys.version_info[1] >= 7:
try:
from cassandra.io.eventletreactor import EventletConnection
is_eventlet = issubclass(self.connection_class, EventletConnection)
except:
# Eventlet is not available or can't be detected
return tpe_class(**kwargs)

if is_eventlet:
try:
from futurist import GreenThreadPoolExecutor
tpe_class = GreenThreadPoolExecutor
except ImportError:
# futurist is not available
raise ImportError(
("Python 3.7+ and Eventlet cause the `concurrent.futures.ThreadPoolExecutor` "
"to hang indefinitely. If you want to use the Eventlet reactor, you "
"need to install the `futurist` package to allow the driver to use "
"the GreenThreadPoolExecutor. See https://github.com/eventlet/eventlet/issues/508 "
"for more details."))

return tpe_class(**kwargs)

def register_user_type(self, keyspace, user_type, klass):
"""
Registers a class to use to represent a particular user-defined type.
Expand Down
5 changes: 1 addition & 4 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import weakref


if 'gevent.monkey' in sys.modules:
from gevent.queue import Queue, Empty
else:
from queue import Queue, Empty # noqa
from queue import Queue, Empty # noqa

from cassandra import ConsistencyLevel, AuthenticationFailed, OperationTimedOut, ProtocolVersion
from cassandra.marshal import int32_pack
Expand Down
31 changes: 5 additions & 26 deletions cassandra/datastax/cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,38 +78,36 @@ def from_dict(cls, d):
return c


def get_cloud_config(cloud_config, create_pyopenssl_context=False):
def get_cloud_config(cloud_config):
if not _HAS_SSL:
raise DriverException("A Python installation with SSL is required to connect to a cloud cluster.")

if 'secure_connect_bundle' not in cloud_config:
raise ValueError("The cloud config doesn't have a secure_connect_bundle specified.")

try:
config = read_cloud_config_from_zip(cloud_config, create_pyopenssl_context)
config = read_cloud_config_from_zip(cloud_config)
except BadZipFile:
raise ValueError("Unable to open the zip file for the cloud config. Check your secure connect bundle.")

config = read_metadata_info(config, cloud_config)
if create_pyopenssl_context:
config.ssl_context = config.pyopenssl_context
return config


def read_cloud_config_from_zip(cloud_config, create_pyopenssl_context):
def read_cloud_config_from_zip(cloud_config):
secure_bundle = cloud_config['secure_connect_bundle']
use_default_tempdir = cloud_config.get('use_default_tempdir', None)
with ZipFile(secure_bundle) as zipfile:
base_dir = tempfile.gettempdir() if use_default_tempdir else os.path.dirname(secure_bundle)
tmp_dir = tempfile.mkdtemp(dir=base_dir)
try:
zipfile.extractall(path=tmp_dir)
return parse_cloud_config(os.path.join(tmp_dir, 'config.json'), cloud_config, create_pyopenssl_context)
return parse_cloud_config(os.path.join(tmp_dir, 'config.json'), cloud_config)
finally:
shutil.rmtree(tmp_dir)


def parse_cloud_config(path, cloud_config, create_pyopenssl_context):
def parse_cloud_config(path, cloud_config):
with open(path, 'r') as stream:
data = json.load(stream)

Expand All @@ -123,11 +121,7 @@ def parse_cloud_config(path, cloud_config, create_pyopenssl_context):
ca_cert_location = os.path.join(config_dir, 'ca.crt')
cert_location = os.path.join(config_dir, 'cert')
key_location = os.path.join(config_dir, 'key')
# Regardless of if we create a pyopenssl context, we still need the builtin one
# to connect to the metadata service
config.ssl_context = _ssl_context_from_cert(ca_cert_location, cert_location, key_location)
if create_pyopenssl_context:
config.pyopenssl_context = _pyopenssl_context_from_cert(ca_cert_location, cert_location, key_location)

return config

Expand Down Expand Up @@ -178,18 +172,3 @@ def _ssl_context_from_cert(ca_cert_location, cert_location, key_location):

return ssl_context


def _pyopenssl_context_from_cert(ca_cert_location, cert_location, key_location):
try:
from OpenSSL import SSL
except ImportError as e:
raise ImportError(
"PyOpenSSL must be installed to connect to Astra with the Eventlet or Twisted event loops")\
.with_traceback(e.__traceback__)
ssl_context = SSL.Context(SSL.TLSv1_METHOD)
ssl_context.set_verify(SSL.VERIFY_PEER, callback=lambda _1, _2, _3, _4, ok: ok)
ssl_context.use_certificate_file(cert_location)
ssl_context.use_privatekey_file(key_location)
ssl_context.load_verify_locations(ca_cert_location)

return ssl_context
6 changes: 1 addition & 5 deletions cassandra/datastax/insights/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,7 @@ def _get_startup_data(self):
cert_validation = None
try:
if self._session.cluster.ssl_context:
if isinstance(self._session.cluster.ssl_context, ssl.SSLContext):
cert_validation = self._session.cluster.ssl_context.verify_mode == ssl.CERT_REQUIRED
else: # pyopenssl
from OpenSSL import SSL
cert_validation = self._session.cluster.ssl_context.get_verify_mode() != SSL.VERIFY_NONE
cert_validation = self._session.cluster.ssl_context.verify_mode == ssl.CERT_REQUIRED
elif self._session.cluster.ssl_options:
cert_validation = self._session.cluster.ssl_options.get('cert_reqs') == ssl.CERT_REQUIRED
except Exception as e:
Expand Down
Loading