UNLV

[UNLV] - 데이터 엔지니어링과 MQTT 스트리밍(7일차)

빡성 2026. 2. 13. 02:20

1. 수업 개요: 데이터 엔지니어링과 스트리밍

이번 수업은 데이터 엔지니어링(Data Engineering)과 실시간 데이터 스트리밍에 대해 다뤘다. 데이터 엔지니어링은 분석 및 의사 결정을 위해 원시 데이터를 수집, 처리하고 구조화된 형식으로 변환하는 프로세스와 기술을 포함한다.

특히 IoT 환경에서 여러 센서로부터 실시간으로 데이터를 수집하고 처리하는 방법을 MQTT(Message Queuing Telemetry Transport) 프로토콜을 통해 실습했다. MQTT는 경량 메시징 프로토콜로, IoT 디바이스 간 통신에 최적화되어 있다.

실습에서는 M5Stack Core2와 Raspberry Pi Zero W2를 사용하여 엔드 투 엔드 IoT 데이터 파이프라인을 구축했다. 센서 데이터를 MQTT로 발행하고, Python으로 구독하여 CSV 파일로 저장하고 시각화하는 전체 과정을 경험했다.

2. 데이터 엔지니어링의 정의와 프로세스

데이터 엔지니어링은 원시 데이터를 수집, 처리 및 구조화되고 사용 가능한 형식으로 변환하는 프로세스와 기술을 포함한다. 머신러닝 모델이 제대로 작동하려면 고품질의 데이터가 필요하며, 데이터 엔지니어링은 이를 보장하는 핵심 단계다.

데이터 엔지니어링의 주요 프로세스는 다음과 같다:

  • 수집 (Gathering / Collection): 다양한 소스로부터 데이터를 가져온다.
  • 정제 (Refinement): 데이터를 정제하고 변환한다.
  • 유지 관리 (Sustainment): 데이터 파이프라인을 지속적으로 관리한다.
  • 처리 (Processing): 데이터를 분석 가능한 형태로 가공한다.
  • 검증 (Validation): 데이터 품질을 확인한다.
  • 저장소 (Storage): 데이터를 적절한 저장소에 저장한다.
  • 보안 (Security): 데이터 접근 권한과 프라이버시를 관리한다.
  • 증강 (Augmentation): 데이터에 추가 정보를 결합한다.
  • 라벨링 (Labeling): 지도 학습을 위한 레이블을 생성한다.
  • 버전 관리 (Versioning): 데이터셋의 버전을 추적한다.

데이터 엔지니어링에서 중요한 것은 요구사항 정의다. 문제 정의, 권한 및 권리, 기계 및 인간이 사용 가능한 형식 등을 명확히 해야 한다.

3. 데이터 수집 (Data Collection)

데이터 수집은 데이터 엔지니어링의 첫 단계다. 다양한 소스로부터 원시 데이터를 가져와야 한다.

데이터 소스 (Sources of Data):

  • 센서: IoT, IMU, 환경, 의료, 산업용 센서
  • 로그: 시스템, 애플리케이션, 클릭스트림, 텔레메트리
  • API: 금융, 날씨, 소셜 미디어 등 외부 API
  • 데이터베이스: SQL/NoSQL 엑스포트
  • 사용자 생성 데이터: 이미지, 텍스트, 라벨, 크라우드소싱
  • 공공 데이터셋: UCI, Kaggle, OpenML

수집 방법 (Collection Methods):

  • 배치 데이터 캡처: CSV/JSON 덤프 등 주기적 수집
  • 스트리밍 데이터 캡처: Kafka, MQTT, RabbitMQ 등 실시간 수집
  • 엣지/임베디드 수집: TinyML 장치, 모바일 앱

도전 과제 (Challenges):

  • 데이터 라벨링 및 주석: 수동, 반자동, 능동 학습
  • 데이터 프라이버시 및 윤리: 동의, 익명화
  • 데이터 품질: 누락, 노이즈, 중복, 편향

4. MQTT 프로토콜과 Pub/Sub 모델

MQTT는 IoT 환경에서 널리 사용되는 경량 메시징 프로토콜이다. Publish/Subscribe (Pub/Sub) 모델을 사용하여 데이터 소스와 데이터 소비자를 분리(Decouple)한다.

전통적인 "Polling" 방식과 달리 MQTT는 "Push" 모델을 사용한다. 장치는 데이터가 있을 때만 전송하고 서버는 즉시 수신한다. 이는 배터리 소비를 줄이고 실시간성을 향상시킨다.

MQTT 데이터 흐름 파이프라인:

  1. 수집 (Publishers): 센서(엔드 디바이스)가 깨어나 데이터를 읽고 특정 Topic(예: factory/machine_01/temp)으로 페이로드를 발행한다.
  2. 전송 (Broker): 중앙 MQTT 브로커(예: Mosquitto)가 메시지를 수신하고, 해당 토픽을 구독하는 대상에게 데이터를 필터링하여 라우팅한다.
  3. 처리 (Subscribers): 백엔드 시스템(DB, 대시보드, AI 모델)이 관심 토픽을 구독하여 저장 또는 분석을 위해 실시간으로 수신한다.

MQTT의 장점은 다음과 같다:

  • 경량 프로토콜로 저전력 디바이스에 적합
  • Pub/Sub 모델로 확장성이 뛰어남
  • Topic 기반 필터링으로 효율적인 메시지 라우팅
  • QoS(Quality of Service) 레벨로 신뢰성 보장

5. 데이터 전처리 (Data Pre-processing)

데이터 과학자는 시간의 50% 이상을 데이터 전처리에 소비한다. 데이터 수집은 두 번째로 많은 시간이 소요되는 구성 요소이며, 알고리즘 튜닝은 적은 부분을 차지한다.

시간 소요 비중:

  • 데이터 전처리 (Data preprocessing): 60%
  • 데이터 수집 (Collecting Data): 19%
  • 데이터 패턴 인식 (Recognizing Data Patterns): 9%
  • 학습 데이터 구축 (Constructing Training Data): 4%
  • 알고리즘 튜닝 (Tuning Algorithm): 3%
  • 기타 (Others): 5%

왜 데이터 전처리를 하는가?

원시 데이터는 다음과 같은 문제를 포함할 수 있다:

  • 불완전함 (Incomplete): 속성이 부족하거나 결측값이 포함됨
  • 노이즈 (Noisy): 이상치와 같은 잘못된 레코드 포함
  • 불일치 (Inconsistent): 충돌하는 레코드나 불일치 발생
  • 결측값 (Missing values): 일부 속성이 비어있거나 NULL임
  • 유효하지 않은 값 (Invalid Values): 성별 등 속성에 잘못된 값 입력
  • 고유성 (Uniqueness): 동일 식별자의 반복된 값
  • 오타 (Misspellings): 잘못 적힌 값들

데이터 전처리 기법:

  • 결측값 (Missing Values): 제거 또는 대체(평균, 중앙값, 보간법 등)로 처리
  • 이상치 (Outliers): 모델 왜곡을 방지하기 위해 극단값을 감지하고 처리
  • 데이터 스케일링 (Data Scaling): 모든 변수가 동등하게 기여하도록 정규화 또는 표준화
    • 정규화 (Normalization): 데이터를 0-1 사이의 고정 범위로 재조정
    • 표준화 (Standardization): 평균 0, 분산 1로 변환 (z-score)
  • 다운샘플링 (Down-sampling): 계산/저장 비용을 줄이기 위해 데이터 속도나 샘플 수 감소
  • 업샘플링 (Up-sampling): 데이터 복제 또는 합성을 통해 샘플 속도를 높이거나 클래스 불균형 해소
  • 노이즈 필터링 (Noise Filtering): 센서 데이터에서 원치 않는 고주파 또는 무작위 변동 제거
  • 평활화 (Smoothing): 이동 평균 등을 적용하여 단기 변동 감소
  • 인코딩 (Encoding): 범주형 데이터를 수치 표현(One-hot 등)으로 변환

6. Data Drift와 Concept Drift

머신러닝 모델을 배포한 후에도 데이터는 계속 변화한다. 이러한 변화를 감지하고 대응하는 것이 중요하다.

Data Drift: 작업(Task)은 변하지 않지만, 시간이 지남에 따라 입력 데이터의 통계적 분포가 변하는 현상이다. 예를 들어, 센서가 노화되거나 환경이 변하면 데이터 분포가 달라질 수 있다.

Concept Drift: 시간이 지남에 따라 입력 데이터와 타겟 라벨 사이의 근본적인 관계가 변하는 현상이다. 예를 들어, 사용자 행동 패턴이 바뀌면 모델의 결정 경계가 더 이상 유효하지 않을 수 있다.

비교:

  • Data Drift: 입력 피처(x)의 분포 변화
  • Concept Drift: 결정 경계(Decision boundary)의 변화, 피처-라벨 관계 변화

두 현상 모두 모델 성능 저하를 야기할 수 있으므로, 지속적인 모니터링과 재학습이 필요하다.

7. 실습: M5Core2 + AHT20 센서로 MQTT 발행

실습에서는 M5Stack Core2에 AHT20 온습도 센서를 연결하고, 센서 데이터를 MQTT 브로커로 발행하는 시스템을 구축했다.

하드웨어 구성:

  • M5Stack Core2 (Wi-Fi 지원)
  • AHT20 온습도 센서 (I2C 통신)
  • Raspberry Pi Zero W2 (MQTT 브로커)

배선 (AHT20 to M5Core2 Port A):

  • GND (Black) → GND
  • 5V (Red) → VIN/VCC
  • G32 (White) → SDA
  • G33 (Yellow) → SCL

필요한 라이브러리:

  • M5Unified (M5Stack 통합 라이브러리)
  • PubSubClient (MQTT 클라이언트)
  • ArduinoJson (JSON 처리)
  • Adafruit AHTX0 (AHT20 센서 드라이버)
M5Core2 AHT20 MQTT Publisher (M5Core2_AHT20_MQTT.ino)
#include <M5Unified.h>
#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include <Adafruit_AHTX0.h>

// --- USER CONFIGURATION ---
const char* ssid = "YOUR_WIFI_SSID";
const char* password = "YOUR_WIFI_PASSWORD";
const char* mqtt_server = "192.168.1.100"; // Replace with your RPi IP
const int mqtt_port = 1883;
const char* location = "Lab_M5_Unit1"; 
// --------------------------

WiFiClient espClient;
PubSubClient client(espClient);
Adafruit_AHTX0 aht;

void setup_wifi() {
  delay(10);
  M5.Display.print("Connecting to WiFi...");
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    M5.Display.print(".");
  }
  M5.Display.println("\nWiFi connected");
}

void reconnect() {
  while (!client.connected()) {
    M5.Display.print("Attempting MQTT connection...");
    String clientId = "M5Core2-";
    clientId += String((uint32_t)ESP.getEfuseMac(), HEX);
    
    if (client.connect(clientId.c_str())) {
      M5.Display.println("connected");
    } else {
      M5.Display.printf("failed, rc=%d try again in 5s\n", client.state());
      delay(5000);
    }
  }
}

void setup() {
  M5.begin();
  M5.Display.setTextSize(2);
  M5.Display.println("Init AHT20 & MQTT...");

  // Initialize I2C for Port A
  Wire.begin(32, 33); 

  if (!aht.begin(&Wire)) {
    M5.Display.println("Could not find AHT? Check wiring");
    while (1) delay(10);
  }
  M5.Display.println("AHT20 Found");

  setup_wifi();
  client.setServer(mqtt_server, mqtt_port);
}

void loop() {
  M5.update();
  
  if (!client.connected()) {
    reconnect();
  }
  client.loop();

  // 1. Get Sensor Data
  sensors_event_t humidity, temp;
  aht.getEvent(&humidity, &temp);

  float t_val = temp.temperature;
  float h_val = humidity.relative_humidity;

  // 2. Create JSON Payload
  StaticJsonDocument<256> doc;
  doc["loc"] = location;
  doc["temp"] = serialized(String(t_val, 2));
  doc["hum"] = serialized(String(h_val, 2));

  char buffer[256];
  serializeJson(doc, buffer);

  // 3. Publish
  client.publish("lab/temphum", buffer);

  // 4. Visual Feedback
  M5.Display.fillScreen(TFT_BLACK);
  M5.Display.setCursor(10, 50);
  M5.Display.printf("Loc: %s\n", location);
  M5.Display.printf("Temp: %.2f C\n", t_val);
  M5.Display.printf("Hum:  %.2f %%", h_val);
  
  delay(5000); 
}

코드의 주요 기능:

  • Wi-Fi 연결 및 MQTT 브로커 연결
  • AHT20 센서로부터 온도와 습도 데이터 읽기
  • JSON 형식으로 데이터 포맷팅
  • "lab/temphum" 토픽으로 데이터 발행
  • M5Stack 디스플레이에 현재 값 표시

8. 실습: Python MQTT 구독자 및 데이터 로깅

Raspberry Pi Zero W2에서 Mosquitto MQTT 브로커를 설정하고, Python 스크립트로 M5Core2에서 발행한 데이터를 구독하여 CSV 파일로 저장하는 실습을 진행했다.

MQTT Broker 설정 (Raspberry Pi):

Mosquitto 설치 및 설정
# 1. 시스템 업데이트
sudo apt update && sudo apt upgrade -y

# 2. Mosquitto 설치
sudo apt install mosquitto mosquitto-clients -y

# 3. Mosquitto 서비스 시작 및 자동 시작 설정
sudo systemctl enable mosquitto
sudo systemctl start mosquitto

# 4. 상태 확인
systemctl status mosquitto

# 5. 원격 액세스 활성화
sudo nano /etc/mosquitto/mosquitto.conf
# 다음 내용 추가:
# listener 1883
# allow_anonymous true

# 6. Mosquitto 재시작
sudo systemctl restart mosquitto

# 7. 포트 확인
ss -tulpn | grep 1883

# 8. 브로커 테스트
# 터미널 1 (구독):
mosquitto_sub -h localhost -t lab/temphum

# 터미널 2 (발행):
mosquitto_pub -h localhost -t lab/temphum -m '{"temp":25.5,"hum":60.2}'

Python MQTT 구독자:

MQTT Consumer (mqtt_loc_consumer.py)
import json
import csv
from datetime import datetime
import paho.mqtt.client as mqtt

# Configuration
TOPIC = "lab/temphum"
CSV_FILE = "temphum_log.csv"
BROKER_ADDRESS = "localhost"

# Initialize CSV with header if file doesn't exist
try:
    with open(CSV_FILE, "r") as f:
        pass
except FileNotFoundError:
    with open(CSV_FILE, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["timestamp", "location", "temp", "hum"])
    print(f"Created new log file: {CSV_FILE}")

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe(TOPIC)
    print(f"Listening on topic: {TOPIC}")
    print("-" * 65)
    print(f"{'TIMESTAMP':<22} | {'LOCATION':<15} | {'TEMP':<8} | {'HUM':<8}")
    print("-" * 65)

def on_message(client, userdata, msg):
    payload = msg.payload.decode()
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    try:
        data = json.loads(payload)
        loc = data.get("loc", "Unknown")
        temp = float(data.get("temp", 0.0))
        hum = float(data.get("hum", 0.0))

        # Display on Terminal
        print(f"{ts:<22} | {loc:<15} | {temp:<8.2f} | {hum:<8.2f}")

        # Save to CSV
        with open(CSV_FILE, "a", newline="") as f:
            writer = csv.writer(f)
            writer.writerow([ts, loc, temp, hum])

    except json.JSONDecodeError:
        print(f"Received non-JSON message: {payload}")
    except Exception as e:
        print(f"Error processing message: {e}")

# Setup MQTT Client
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

try:
    client.connect(BROKER_ADDRESS, 1883, 60)
    client.loop_forever()
except KeyboardInterrupt:
    print("\nDisconnecting...")
    client.disconnect()
except Exception as e:
    print(f"Connection failed: {e}")

이 스크립트는 MQTT 메시지를 수신하여 터미널에 표시하고, 동시에 CSV 파일로 저장한다. 실시간으로 데이터가 수집되는 것을 확인할 수 있었다.

9. 실습: 다중 디바이스 데이터 집계

여러 M5Core2 디바이스로부터 받은 데이터의 평균을 계산하는 실습을 진행했다. 이는 분산 센서 네트워크에서 중앙 집중식 데이터 집계의 예시다.

MQTT Consumer with Average Calculation (mqtt_consumer_avg.py)
import json
import csv
import sys
from datetime import datetime
import paho.mqtt.client as mqtt

TOPIC = "lab/temphum"
CSV_FILE = "temphum_log.csv"
BROKER_ADDRESS = "localhost"

# Global dictionary to store running stats per location
location_stats = {}

def on_connect(client, userdata, flags, rc):
    print(f"Connected to MQTT Broker (Result: {rc})")
    client.subscribe(TOPIC)
    print(f"Listening on topic: {TOPIC}")
    print("=" * 90)
    print(f"{'TIMESTAMP':<20} | {'LOCATION':<12} | {'TEMP (C)':<18} | {'HUMIDITY (%)':<18}")
    print(f"{'':<20} | {'':<12} | {'Current (Avg)':<18} | {'Current (Avg)':<18}")
    print("=" * 90)

def on_message(client, userdata, msg):
    try:
        payload = msg.payload.decode()
        data = json.loads(payload)

        loc = data.get("loc", "Unknown")
        temp = float(data.get("temp", 0.0))
        hum = float(data.get("hum", 0.0))
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Update running averages
        if loc not in location_stats:
            location_stats[loc] = {"temp_sum": 0.0, "hum_sum": 0.0, "count": 0}

        stats = location_stats[loc]
        stats["temp_sum"] += temp
        stats["hum_sum"] += hum
        stats["count"] += 1

        # Calculate Averages
        avg_temp = stats["temp_sum"] / stats["count"]
        avg_hum = stats["hum_sum"] / stats["count"]

        # Display output: "25.0 (24.8)"
        temp_str = f"{temp:.1f} ({avg_temp:.1f})"
        hum_str = f"{hum:.1f} ({avg_hum:.1f})"

        print(f"{ts:<20} | {loc:<12} | {temp_str:<18} | {hum_str:<18}")

        # Save to CSV
        with open(CSV_FILE, "a", newline="") as f:
            writer = csv.writer(f)
            writer.writerow([ts, loc, temp, hum, round(avg_temp, 2), round(avg_hum, 2)])

    except json.JSONDecodeError:
        print(f"Error: Received non-JSON payload: {msg.payload}")
    except Exception as e:
        print(f"Error processing message: {e}")

# Setup MQTT Client
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

try:
    client.connect(BROKER_ADDRESS, 1883, 60)
    client.loop_forever()
except KeyboardInterrupt:
    print("\nDisconnecting...")
    client.disconnect()
except Exception as e:
    print(f"Connection failed: {e}")

이 스크립트는 각 위치별로 누적 평균을 계산하여 표시한다. 현재 값과 평균 값을 함께 보여주어 데이터의 추세를 파악할 수 있다.

10. 실습: RPi + PCT2075 센서로 MQTT 발행

Raspberry Pi Zero W2에 PCT2075 온도 센서를 연결하고, Python으로 센서 데이터를 읽어 MQTT 브로커로 발행하는 실습을 진행했다. 이는 이종 네트워크(Microcontrollers 및 SBC)로부터 환경 모니터링을 중앙 집중화하는 예시다.

하드웨어 구성:

  • Raspberry Pi Zero W2
  • PCT2075 온도 센서 (I2C 통신)

배선 (RPi Zero W2 to PCT2075):

  • Pin 1 (3.3V) → VCC
  • Pin 3 (SDA) → SDA
  • Pin 5 (SCL) → SCL
  • Pin 6 (GND) → GND

필요한 설정:

  • I2C 활성화: sudo raspi-config → Interface Options → I2C
  • 라이브러리 설치: pip3 install adafruit-circuitpython-pct2075
RPi PCT2075 MQTT Publisher (rpi_publisher_pct2075.py)
import time
import json
import board
import adafruit_pct2075
import paho.mqtt.client as mqtt

# --- USER CONFIGURATION ---
MQTT_BROKER_IP = "192.168.1.100"
MQTT_PORT = 1883
MQTT_TOPIC = "lab/temphum"
LOCATION_NAME = "RPI_Node_PCT_East"
# --------------------------

# Initialize I2C bus
i2c = board.I2C()
sensor = adafruit_pct2075.PCT2075(i2c)

# MQTT Setup
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"Connected to Master Broker at {MQTT_BROKER_IP}")
    else:
        print(f"Failed to connect, return code {rc}")

client = mqtt.Client()
client.on_connect = on_connect
client.connect(MQTT_BROKER_IP, MQTT_PORT, 60)
client.loop_start()

# Main Loop
try:
    while True:
        # Read Temperature
        current_temp = sensor.temperature

        # Prepare Payload (humidity is None for PCT2075)
        payload = {
            "loc": LOCATION_NAME,
            "temp": round(current_temp, 2),
            "hum": None
        }
        json_payload = json.dumps(payload)

        # Publish
        print(f"Sending: {json_payload}")
        client.publish(MQTT_TOPIC, json_payload)

        time.sleep(5)

except KeyboardInterrupt:
    print("\nStopping...")
    client.loop_stop()
    client.disconnect()

PCT2075는 온도 전용 센서이므로 습도 값은 None으로 처리한다. 이렇게 하면 M5Core2의 AHT20 데이터와 호환되는 JSON 구조를 유지할 수 있다.

11. 시뮬레이션: 합성 데이터 생성 및 전처리

실제 센서 데이터의 특성을 모방한 합성 데이터를 생성하고, 다양한 전처리 기법을 적용하여 비교하는 시뮬레이션을 진행했다.

합성 데이터 생성:

Synthetic Data Generation (generate_synthetic_data_mqtt.py)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def generate_and_visualize_data():
    # 7 days of data at 15-minute intervals
    dates = pd.date_range(start='2024-01-01', periods=24 * 4 * 7, freq='15min')
    n = len(dates)

    # Temperature: Daily cycle (sine wave) + random Gaussian noise
    temp_base = 20 + 10 * np.sin(np.linspace(0, 7 * 2 * np.pi, n))
    temp_noise = np.random.normal(0, 1.5, n)
    temperature = temp_base + temp_noise

    # Humidity: Inverse to temp + random noise
    humidity_base = 60 - 20 * np.sin(np.linspace(0, 7 * 2 * np.pi, n))
    humidity_noise = np.random.normal(0, 3, n)
    humidity = humidity_base + humidity_noise

    locations = np.random.choice(['Lab_A', 'Server_Room', 'Outdoor_Unit'], size=n)

    df = pd.DataFrame({
        'Timestamp': dates,
        'Location': locations,
        'Temperature': temperature,
        'Humidity': humidity
    })

    # Introduce Artifacts (Bad Data)
    # Missing Values (5% of data)
    nan_indices_t = np.random.choice(df.index, size=int(n * 0.05), replace=False)
    nan_indices_h = np.random.choice(df.index, size=int(n * 0.05), replace=False)
    df.loc[nan_indices_t, 'Temperature'] = np.nan
    df.loc[nan_indices_h, 'Humidity'] = np.nan

    # Outliers
    outlier_indices_t = np.random.choice(df.index, size=10, replace=False)
    df.loc[outlier_indices_t, 'Temperature'] = df.loc[outlier_indices_t, 'Temperature'] * 3

    outlier_indices_h = np.random.choice(df.index, size=5, replace=False)
    df.loc[outlier_indices_h, 'Humidity'] = -20

    # Save to CSV
    df.to_csv('synthetic_sensor_data.csv', index=False)
    print(f"Dataset saved to: synthetic_sensor_data.csv")

    return df

데이터 전처리 및 비교:

Data Processing Comparison (data_processing_comparison.py)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from scipy import stats

def process_and_visualize():
    df = pd.read_csv('synthetic_sensor_data.csv')
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])
    df_clean = df.copy()

    # 1. Handle Missing Values (Linear Interpolation)
    df_clean['Temperature'] = df_clean['Temperature'].interpolate(method='linear')
    df_clean['Humidity'] = df_clean['Humidity'].interpolate(method='linear')
    df_clean.dropna(inplace=True)

    # 2. Detect & Handle Outliers (Z-Score Method)
    for col in ['Temperature', 'Humidity']:
        z_scores = np.abs(stats.zscore(df_clean[col]))
        outliers = z_scores > 3
        df_clean.loc[outliers, col] = df_clean[col].median()

    # 3. Noise Filtering / Smoothing (Moving Average)
    df_clean['Temp_Smoothed'] = df_clean['Temperature'].rolling(window=4, center=True).mean().bfill().ffill()
    df_clean['Hum_Smoothed'] = df_clean['Humidity'].rolling(window=4, center=True).mean().bfill().ffill()

    # 4. Normalization (Min-Max Scaling)
    scaler_minmax = MinMaxScaler()
    df_clean[['Temp_Norm', 'Hum_Norm']] = scaler_minmax.fit_transform(df_clean[['Temp_Smoothed', 'Hum_Smoothed']])

    # 5. Standardization (Z-Score Scaling)
    scaler_std = StandardScaler()
    df_clean[['Temp_Std', 'Hum_Std']] = scaler_std.fit_transform(df_clean[['Temp_Smoothed', 'Hum_Smoothed']])

    # 6. Encoding Categorical Data (One-Hot Encoding)
    df_clean = pd.get_dummies(df_clean, columns=['Location'], prefix='Loc')

    # Save processed data
    df_clean.to_csv('processed_sensor_data.csv', index=False)
    print("Processed data saved to: processed_sensor_data.csv")

    return df_clean

이 시뮬레이션을 통해 다음을 확인했다:

  • 원시 데이터에 노이즈, 이상치, 결측값이 포함된 상태
  • 전처리 후 데이터가 부드럽고 일관성 있게 변환된 상태
  • 정규화와 표준화의 차이점
  • 이동 평균 필터링의 효과

Data Drift와 Concept Drift 시뮬레이션:

실제 환경에서 발생할 수 있는 Data Drift와 Concept Drift를 시뮬레이션하는 코드도 작성했다. 이 코드는 3단계로 구성된다:

  1. Baseline (정상 작동): 온도와 습도 간의 안정적인 관계
  2. Data Drift (센서 보정 드리프트): 온도 센서가 점진적으로 5도 상승하는 드리프트 발생. 관계는 유지되지만 센서 값이 변함
  3. Concept Drift (물리적 관계 변화): 온도와 습도 간의 관계가 반대로 변함 (음의 상관관계 → 양의 상관관계)

시뮬레이션에서는 KS-Test를 사용하여 Data Drift를 감지하고, 모델의 예측 오차(MAE)를 모니터링하여 Concept Drift를 감지한다. Drift가 감지되면 자동으로 스케일러를 업데이트하거나 모델을 재학습한다.

12. 정리

이번 수업을 통해 데이터 엔지니어링의 전체 파이프라인을 경험했다. MQTT 프로토콜을 사용한 실시간 데이터 수집부터 전처리까지의 과정을 직접 구현해볼 수 있었다.

특히 중요한 것은 "데이터 과학자는 시간의 50% 이상을 데이터 전처리에 소비한다"는 점이었다. 좋은 모델을 만들기 위해서는 고품질의 데이터가 필수이며, 이를 위해서는 체계적인 데이터 엔지니어링 프로세스가 필요하다.

MQTT의 Pub/Sub 모델은 IoT 환경에서 확장 가능한 데이터 파이프라인을 구축하는 데 매우 유용하다. 여러 디바이스로부터 데이터를 수집하고 중앙에서 집계하는 구조는 실제 산업 환경에서 널리 사용되는 패턴이다.

이 수업에서 다룬 내용은 앞으로 IoT 프로젝트나 센서 데이터 분석을 할 때 매우 유용할 것이다. 특히 MQTT를 활용한 실시간 데이터 수집과 전처리 기법을 이해한 것은 실무에서 큰 도움이 될 것 같다.