Skip to content

Commit ffc6689

Browse files
committed
Refactor MQTT client setup; streamline connection logic and enhance message handling
1 parent 8613142 commit ffc6689

3 files changed

Lines changed: 54 additions & 54 deletions

File tree

app/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
# Import Module Kita
99
from app.core.database import engine, get_db, Base
1010
from app.models.device import Device
11-
from app.mqtt.client import connect_mqtt
11+
from app.mqtt.client import start_mqtt
1212
from app.api.v1.devices import router as device_router
1313
from app.core.limiter import limiter
1414

app/mqtt/callbacks.py

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,47 @@
1-
import paho.mqtt.client as mqtt
2-
from app.core.config import settings # Asumsi config ada di sini
3-
from app.mqtt.callbacks import on_message # Import logika langkah 2
1+
from sqlalchemy.orm import Session
2+
from app.core.database import SessionLocal
3+
from app.models.device import SensorLog
44

5-
# Global variable buat client
6-
mqtt_client = mqtt.Client(client_id="Backend_Listener_Worker", protocol=mqtt.MQTTv311)
7-
8-
def start_mqtt():
9-
# 1. Setup Auth (Sesuaikan dengan kredensial EMQX kamu)
10-
# Lebih baik ambil dari .env/config, tapi hardcode dulu buat test gapapa
11-
MQTT_BROKER = "k2519aa6.ala.asia-southeast1.emqxsl.com"
12-
MQTT_PORT = 8883
13-
MQTT_USER = "PCB01"
14-
MQTT_PASS = "5ywnMzsVX4Ss9vH"
15-
16-
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS)
17-
mqtt_client.tls_set() # Wajib SSL untuk port 8883
18-
19-
# 2. Hubungkan logic Callback
20-
mqtt_client.on_message = on_message
5+
# --- FUNGSI SIMPAN KE DB ---
6+
def save_sensor_data(device_id: str, topic_type: str, payload: str):
7+
db: Session = SessionLocal()
8+
try:
9+
new_log = SensorLog(
10+
device_id=device_id,
11+
topic_type=topic_type,
12+
value=payload
13+
)
14+
db.add(new_log)
15+
db.commit()
16+
print(f"💾 Saved DB: [{device_id}] {topic_type} -> {payload}")
17+
except Exception as e:
18+
print(f"❌ Gagal Simpan DB: {e}")
19+
db.rollback()
20+
finally:
21+
db.close()
2122

22-
# 3. Konek
23+
# --- CALLBACK SAAT PESAN MASUK ---
24+
def on_message(client, userdata, msg):
2325
try:
24-
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
26+
topic = msg.topic
27+
payload = msg.payload.decode()
2528

26-
# 4. Subscribe Dynamic Topic
27-
# Tanda '+' adalah wildcard.
28-
# Artinya kita dengar semua alat di topik status
29-
mqtt_client.subscribe("alat/+/status/#")
29+
# LOGIKA MEMBEDAH TOPIK DINAMIS
30+
# Format: alat/{DEVICE_ID}/status/{TIPE}
31+
# Contoh: alat/441D64BE2208/status/suhu
3032

31-
mqtt_client.loop_start() # Jalankan di background thread
32-
print("✅ MQTT Listener Berjalan! Mendengarkan: alat/+/status/#")
33+
parts = topic.split("/")
3334

35+
# Validasi: Harus ada 4 bagian, depannya 'alat', tengahnya 'status'
36+
if len(parts) == 4 and parts[0] == "alat" and parts[2] == "status":
37+
device_id = parts[1] # Ambil ID (441D...)
38+
sensor_type = parts[3] # Ambil Tipe (suhu/ldr/lampu)
39+
40+
# Simpan ke Database
41+
save_sensor_data(device_id, sensor_type, payload)
42+
43+
else:
44+
print(f"⚠️ Topik tidak sesuai format: {topic}")
45+
3446
except Exception as e:
35-
print(f"❌ Gagal Konek MQTT: {e}")
47+
print(f"Error processing MQTT: {e}")

app/mqtt/client.py

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,40 @@
11
import paho.mqtt.client as mqtt
2-
# Jika kamu belum punya app.core.config, kita hardcode dulu credentials-nya
3-
# from app.core.config import settings
42
from app.mqtt.callbacks import on_message
53

6-
# Variabel Global Client
4+
# Global Client
75
mqtt_client = mqtt.Client(client_id="Backend_Listener_Worker", protocol=mqtt.MQTTv311)
86

97
def start_mqtt():
10-
"""
11-
Fungsi ini dipanggil oleh main.py saat server start.
12-
Tugasnya konek ke Broker dan mulai loop listener.
13-
"""
14-
# 1. Setup Auth (Sesuaikan dengan kredensial EMQX kamu)
8+
# 1. SETUP KREDENSIAL (Hardcode dulu biar aman dari error import)
159
MQTT_BROKER = "k2519aa6.ala.asia-southeast1.emqxsl.com"
1610
MQTT_PORT = 8883
1711
MQTT_USER = "PCB01"
1812
MQTT_PASS = "5ywnMzsVX4Ss9vH"
1913

20-
print(f"🔌 Menghubungkan ke MQTT Broker: {MQTT_BROKER}...")
14+
print(f"🔌 Menghubungkan ke EMQX: {MQTT_BROKER}...")
2115

22-
# Set Username & Password
16+
# 2. CONFIG CLIENT
2317
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS)
24-
25-
# Wajib SSL untuk port 8883
26-
mqtt_client.tls_set()
18+
mqtt_client.tls_set() # WAJIB SSL KARENA PORT 8883
19+
mqtt_client.on_message = on_message # Sambungkan ke otak logic tadi
2720

28-
# 2. Hubungkan logic Callback (PENTING!)
29-
mqtt_client.on_message = on_message
30-
31-
# 3. Konek & Subscribe
21+
# 3. CONNECT & SUBSCRIBE
3222
try:
3323
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
3424

35-
# Subscribe ke Topik Dinamis
25+
# SUBSCRIBE WILDCARD
26+
# "+" artinya ID alat apa saja
27+
# "#" artinya status apa saja (suhu, ldr, dll)
3628
mqtt_client.subscribe("alat/+/status/#")
3729

38-
# Jalankan di background thread
39-
mqtt_client.loop_start()
40-
30+
mqtt_client.loop_start() # Jalan di background
4131
print("✅ MQTT Listener Berjalan! Mendengarkan: alat/+/status/#")
4232

4333
except Exception as e:
4434
print(f"❌ Gagal Konek MQTT: {e}")
4535

46-
# Fungsi helper untuk publish (dipakai nanti buat control)
36+
# Fungsi helper untuk control (dipakai nanti)
4737
def publish(topic: str, payload: str):
4838
if mqtt_client.is_connected():
4939
mqtt_client.publish(topic, payload)
50-
print(f"📡 Published: {topic} -> {payload}")
51-
else:
52-
print("⚠️ MQTT belum terhubung, gagal publish.")
40+
print(f"📡 Backend Kirim: {topic} -> {payload}")

0 commit comments

Comments
 (0)