Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ name: CI

on:
push:
branches: [ "dev", "master" ]
branches: ["dev", "master"]
pull_request:
branches: [ "dev", "master" ]
workflow_dispatch:

jobs:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,6 @@ pyrightconfig.json

# End of https://www.toptal.com/developers/gitignore/api/python
*.lock

# Raw EEG recordings
recordings/
9 changes: 0 additions & 9 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,3 @@ repos:
- "pytest"
- "pytest-asyncio"
- "pytest-mock"

- repo: local
hooks:
- id: pytest
name: pytest
entry: pytest
language: system
types: [python]
pass_filenames: false
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/)

**Wersja: 0.1.0**

**Bridge** to biblioteka (SDK) i aplikacja wiersza poleceń w Pythonie, która tworzy ujednolicony interfejs do zbierania danych z różnych urządzeń EEG. Działa jako "most" między sprzętem a oprogramowaniem analitycznym.

## Główne Cechy
Expand Down
3 changes: 3 additions & 0 deletions bridge/eeg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .config import close, init
from .connector import EEGConnector
from .core import DeviceData, EEGArray, EEGDevice
from .fif import FifDevice, FifRecorder

__all__ = [
"DeviceData",
Expand All @@ -9,4 +10,6 @@
"init",
"close",
"EEGConnector",
"FifDevice",
"FifRecorder",
]
15 changes: 13 additions & 2 deletions bridge/eeg/brainaccess/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
from .device import BrainaccessDevice
from .cap_factory import DEVICE_TO_CAP, get_cap_from_model, get_cap_from_name

__all__ = ["BrainaccessDevice"]
__all__ = [
"DEVICE_TO_CAP",
"get_cap_from_model",
"get_cap_from_name",
]

try:
from .device import BrainaccessDevice

__all__ = [*__all__, "BrainaccessDevice"]
except ImportError:
pass
97 changes: 78 additions & 19 deletions bridge/eeg/brainaccess/device.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import multiprocessing
import time
from logging import Logger, getLogger
from queue import Empty, Queue
from typing import Generator

import brainaccess.core.eeg_channel as eeg_channel
import numpy as np
from brainaccess import core
from brainaccess.core.eeg_manager import EEGManager
from brainaccess.utils import acquisition
Expand All @@ -14,6 +18,7 @@
DATA_COLLECTION_TIME,
DEFAULT_BLUETOOTH_ADAPTER,
DEFAULT_DEVICE_PORT,
GAIN_MODE,
IMPEDANCE_MEASUREMENT_TIME,
)

Expand All @@ -27,6 +32,8 @@ def __init__(self, logger: Logger | None = None) -> None:
self._cap: dict[int, str] | None = None
self._mac_address: str | None = None
self._device_name: str | None = None
self._stream_queue: Queue[EEGArray] = Queue()
self._is_streaming: bool = False

super().__init__(logger or getLogger(__name__))

Expand All @@ -43,13 +50,21 @@ def _connect(self, device_name: str, cap: dict[int, str]) -> None:
try:
self._eeg.setup(self._manager, device_name=device_name, cap=cap)
self._electrodes = list(cap.values())
self._logger.info("Connection successful.")
except Exception:
self._manager.__exit__(None, None, None)
raise

self._logger.info("Connection successful.")

def _acq_callback(self, chunk: list[float], chunk_size: int) -> None:
"""Wewnętrzny callback wywoływany przez BrainAccess SDK."""
if self._is_streaming:
chunk_array = np.array(chunk)
num_channels = len(self._electrodes)
eeg_chunk = chunk_array[:num_channels, :].astype(np.float64)

self._stream_queue.put(eeg_chunk)

# IM-032
def _get_device_model(self, port: int) -> str:
status = self._manager.connect(port) # type: ignore[union-attr]
Expand Down Expand Up @@ -78,8 +93,10 @@ def connect(

with connection_lock:
self._logger.debug("Scanning for eeg...")
core.scan(adapter_index=bluetooth_adapter)
count = core.get_device_count()
if bluetooth_adapter != 0:
core.config_set_adapter_index(bluetooth_adapter)
devices = core.scan()
count = len(devices)
self._logger.info(f"Found {count} eeg.")

if count == 0:
Expand All @@ -90,33 +107,79 @@ def connect(
if port >= count:
raise ConnectionError(f"Can't connect on port {port}, found {count} eeg.")

self._device_name = core.get_device_name(port) or "Unknown Device"
self._mac_address = core.get_device_address(port)
self._device_name = devices[port].name or "Unknown Device"
self._mac_address = devices[port].mac_address
self._cap = get_cap_from_name(self._device_name)

if not self._cap:
model = self._get_device_model(port)
self._cap = get_cap_from_model(model)

self._electrodes = list(self._cap.values())

try:
self._connect(self._device_name, self._cap)
return
except Exception as e:
self._logger.exception(e)
if self._manager:
self._manager.__exit__(None, None, None)
self._logger.exception(f"Connection failed: {e}")
raise

# IM-032
def disconnect(self) -> None:
self._ensure_connected()
self._is_streaming = False
self._logger.debug("Disconnecting the device...")
if self._manager:
try:
self._manager.stop_stream()
except Exception:
pass
self._manager.disconnect()
self._manager.__exit__(None, None, None)
# self._manager.destroy()
self._manager = None
self._eeg.close()

self._logger.info("Device disconnected successfully.")

def stream(self) -> Generator[EEGArray, None, None]:
"""
Generator strumieniujący dane EEG w czasie rzeczywistym.
Użycie:
for chunk in device.stream():
process(chunk)
"""
self._ensure_connected()
assert self._manager is not None

while not self._stream_queue.empty():
self._stream_queue.get()

num_channels = len(self._electrodes)
for i in range(num_channels):
self._manager.set_channel_enabled(eeg_channel.ELECTRODE_MEASUREMENT + i, True)
self._manager.set_channel_gain(eeg_channel.ELECTRODE_MEASUREMENT + i, GAIN_MODE)
self._manager.set_channel_bias(eeg_channel.ELECTRODE_MEASUREMENT + i, True)

self._manager.set_channel_enabled(eeg_channel.STREAMING, True)
self._manager.set_callback_chunk(self._acq_callback)

self._is_streaming = True
self._manager.start_stream()
self._logger.info("Started real-time stream.")

try:
while self._is_streaming:
try:
chunk = self._stream_queue.get(timeout=1.0)
yield chunk
except Empty:
continue
finally:
self._is_streaming = False
self._logger.info("Stopped real-time stream.")

# IM-032
def get_impedance(self, duration: float = IMPEDANCE_MEASUREMENT_TIME) -> list[float]:
self._ensure_connected()
Expand Down Expand Up @@ -153,16 +216,12 @@ def get_output(self, duration: float = DATA_COLLECTION_TIME, output_file: str |
self._logger.info("Data acquisition completed.")
return raw_data # type: ignore[no-any-return]

def get_device_data(self) -> DeviceData | None:
def get_device_data(self) -> DeviceData:
self._ensure_connected()
try:
return DeviceData(
name=self._device_name,
mac_address=self._mac_address,
manufacturer=BRAINACCESS_MANUFACTURER,
electrodes_num=len(self._cap) if self._cap else None,
sample_rate=self._manager.get_sample_frequency() if self._manager else None,
)
except Exception as e:
self._logger.exception(f"Failed to fetch device data for device {self.__class__.__name__}: {e}")
return None
return DeviceData(
name=self._device_name,
mac_address=self._mac_address,
manufacturer=BRAINACCESS_MANUFACTURER,
electrodes_num=len(self._cap) if self._cap else None,
sample_rate=self._manager.get_sample_frequency() if self._manager else None,
)
35 changes: 31 additions & 4 deletions bridge/eeg/core/device.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable
from logging import Logger, getLogger
from types import TracebackType
from typing import Generator

from .device_data import DeviceData
from .typing import EEGArray
Expand All @@ -10,6 +13,29 @@ class EEGDevice(ABC):
def __init__(self, logger: Logger | None = None) -> None:
self._logger = logger or getLogger(__name__)
self._logger.debug(f"{self.__class__.__name__} initialized.")
self._subscribers: list[Callable[[EEGArray], None]] = []
self._push_thread: threading.Thread | None = None

def subscribe(self, callback: Callable[[EEGArray], None]) -> None:
self._subscribers.append(callback)

def start(self) -> None:
self._push_thread = threading.Thread(target=self._push_loop, daemon=True)
self._push_thread.start()

def stop(self) -> None:
self.disconnect()
if self._push_thread is not None:
self._push_thread.join(timeout=5)
self._push_thread = None

def _push_loop(self) -> None:
for chunk in self.stream():
for cb in list(self._subscribers):
try:
cb(chunk)
except Exception:
self._logger.exception("Subscriber %r raised", cb)

@abstractmethod
def connect(self) -> None:
Expand All @@ -19,15 +45,17 @@ def connect(self) -> None:
def disconnect(self) -> None:
pass

@abstractmethod
def get_output(self, duration: float, output_file: str | None = None) -> EEGArray:
pass
raise NotImplementedError(f"Output retrieval not implemented for this class {self.__class__.__name__}.")

def get_impedance(self, duration: float) -> list[float]:
raise NotImplementedError(f"Impedance measurement not implemented for this class {self.__class__.__name__}.")

def stream(self) -> Generator[EEGArray, None, None]:
raise NotImplementedError(f"Streaming not implemented for this class {self.__class__.__name__}.")

@abstractmethod
def get_device_data(self) -> DeviceData | None:
def get_device_data(self) -> DeviceData:
pass

def __enter__(self) -> "EEGDevice":
Expand All @@ -43,4 +71,3 @@ def __exit__(
) -> None:
self._logger.debug("Exiting context manager...")
self.disconnect()
return None
10 changes: 9 additions & 1 deletion bridge/eeg/core/device_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from dataclasses import dataclass

from bridge.eeg.core.typing import EEGArray

@dataclass

@dataclass(frozen=True, slots=True, kw_only=True)
class DeviceData:
mac_address: str | None = None
name: str | None = None
manufacturer: str | None = None
electrodes_num: int | None = None
sample_rate: int | None = None


@dataclass(frozen=True, slots=True, kw_only=True)
class RecordingFrame:
timestamp: float
data: EEGArray
4 changes: 3 additions & 1 deletion bridge/eeg/core/typing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import TypeAlias

import numpy as np
from numpy.typing import NDArray

EEGArray = NDArray[np.float64]
EEGArray: TypeAlias = NDArray[np.float64]
4 changes: 4 additions & 0 deletions bridge/eeg/fif/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .device import FifDevice
from .recorder import FifRecorder

__all__ = ["FifDevice", "FifRecorder"]
Loading
Loading