Skip to content

Commit 8613142

Browse files
committed
Implement MQTT client setup and sensor data logging; refactor connection logic and enhance message handling
1 parent 7521b3c commit 8613142

3 files changed

Lines changed: 88 additions & 27 deletions

File tree

app/core/config.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from sqlalchemy.orm import Session
2+
from app.core.database import SessionLocal
3+
from app.models.device import SensorLog
4+
5+
# --- HAPUS BARIS INI JIKA ADA: ---
6+
# from app.core.config import settings <-- INI BIANG KEROKNYA, HAPUS!
7+
8+
# Fungsi pembantu untuk simpan ke DB
9+
def save_sensor_data(device_id: str, topic_type: str, payload: str):
10+
db: Session = SessionLocal()
11+
try:
12+
new_log = SensorLog(
13+
device_id=device_id,
14+
topic_type=topic_type,
15+
value=payload
16+
)
17+
db.add(new_log)
18+
db.commit()
19+
print(f"💾 Saved: [{device_id}] {topic_type} -> {payload}")
20+
except Exception as e:
21+
print(f"❌ DB Error: {e}")
22+
db.rollback()
23+
finally:
24+
db.close()
25+
26+
# Callback utama saat ada pesan MQTT masuk
27+
def on_message(client, userdata, msg):
28+
try:
29+
topic = msg.topic
30+
payload = msg.payload.decode()
31+
32+
# Format Topik: alat/{DEVICE_ID}/status/{TIPE}
33+
parts = topic.split("/")
34+
35+
if len(parts) == 4 and parts[0] == "alat" and parts[2] == "status":
36+
device_id = parts[1]
37+
sensor_type = parts[3]
38+
39+
save_sensor_data(device_id, sensor_type, payload)
40+
else:
41+
print(f"⚠️ Topik tidak dikenal: {topic}")
42+
43+
except Exception as e:
44+
print(f"Error processing MQTT: {e}")

app/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
@app.on_event("startup")
3030
async def startup_event():
3131
print("🚀 Menyalakan Mesin MQTT...")
32-
connect_mqtt()
32+
start_mqtt()
3333

3434
# --- ROOT ENDPOINT (Contoh penggunaan di main.py) ---
3535
@app.get("/")

app/mqtt/client.py

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,52 @@
11
import paho.mqtt.client as mqtt
2-
import os
3-
import time
4-
from app.mqtt.handlers import on_message
2+
# Jika kamu belum punya app.core.config, kita hardcode dulu credentials-nya
3+
# from app.core.config import settings
4+
from app.mqtt.callbacks import on_message
55

6-
# Ambil credential dari .env
7-
MQTT_BROKER = os.getenv("MQTT_BROKER")
8-
MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
9-
MQTT_USER = os.getenv("MQTT_USER")
10-
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
6+
# Variabel Global Client
7+
mqtt_client = mqtt.Client(client_id="Backend_Listener_Worker", protocol=mqtt.MQTTv311)
118

12-
# Inisialisasi Client
13-
mqtt_client = mqtt.Client()
9+
def start_mqtt():
10+
"""
11+
Fungsi ini dipanggil oleh main.py saat server start.
12+
Tugasnya konek ke Broker dan mulai loop listener.
13+
"""
14+
# 1. Setup Auth (Sesuaikan dengan kredensial EMQX kamu)
15+
MQTT_BROKER = "k2519aa6.ala.asia-southeast1.emqxsl.com"
16+
MQTT_PORT = 8883
17+
MQTT_USER = "PCB01"
18+
MQTT_PASS = "5ywnMzsVX4Ss9vH"
1419

15-
# Setup Callback (Aksi ketika connect & terima pesan)
16-
def on_connect(client, userdata, flags, rc):
17-
if rc == 0:
18-
print("✅ BERHASIL Konek ke MQTT Broker!")
19-
# Subscribe ke semua topik alat (Wildcard #)
20-
# Artinya: Dengerin semua chat di channel "alat/..."
21-
client.subscribe("alat/#")
22-
else:
23-
print(f"❌ Gagal Konek, Return Code: {rc}")
20+
print(f"🔌 Menghubungkan ke MQTT Broker: {MQTT_BROKER}...")
21+
22+
# Set Username & Password
23+
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS)
24+
25+
# Wajib SSL untuk port 8883
26+
mqtt_client.tls_set()
2427

25-
# Pasang fungsi-fungsinya
26-
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
27-
mqtt_client.on_connect = on_connect
28-
mqtt_client.on_message = on_message # Panggil handler yang kita buat di Langkah 2
28+
# 2. Hubungkan logic Callback (PENTING!)
29+
mqtt_client.on_message = on_message
2930

30-
def connect_mqtt():
31+
# 3. Konek & Subscribe
3132
try:
3233
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
33-
mqtt_client.loop_start() # Jalan di background thread
34+
35+
# Subscribe ke Topik Dinamis
36+
mqtt_client.subscribe("alat/+/status/#")
37+
38+
# Jalankan di background thread
39+
mqtt_client.loop_start()
40+
41+
print("✅ MQTT Listener Berjalan! Mendengarkan: alat/+/status/#")
42+
3443
except Exception as e:
35-
print(f"⚠️ Error Koneksi MQTT: {e}")
44+
print(f"❌ Gagal Konek MQTT: {e}")
45+
46+
# Fungsi helper untuk publish (dipakai nanti buat control)
47+
def publish(topic: str, payload: str):
48+
if mqtt_client.is_connected():
49+
mqtt_client.publish(topic, payload)
50+
print(f"📡 Published: {topic} -> {payload}")
51+
else:
52+
print("⚠️ MQTT belum terhubung, gagal publish.")

0 commit comments

Comments
 (0)