Search⌘ K
AI Features

Solution: Build a Sensor Monitoring System

Explore how to build a sensor monitoring system by defining sensor classes that simulate temperature, pressure, and vibration data. Learn to detect anomalies and generate alerts using concurrency with Python threads. Understand how to process sensor data, manage alert queues, and run concurrent sensor simulations effectively.

Implement sensor simulation

Define sensor classes (TemperatureSensor, PressureSensor, and VibrationSensor) that simulate sensor data and detect anomalies, producing alerts based on specific ...

Python 3.10.4
import random
import time
class Sensor:
def __init__(self, sensor_id):
self.sensor_id = sensor_id
self.last_alert = None
self.alert_count = 0
def generate_data(self):
raise NotImplementedError("Subclasses must implement generate_data method")
def process_data(self, data):
raise NotImplementedError("Subclasses must implement process_data method")
class TemperatureSensor(Sensor):
def generate_data(self):
temperature = random.uniform(20, 30) # Simulate temperature readings
return {"sensor_id": self.sensor_id, "temperature": temperature}
def process_data(self, data):
temperature = data["temperature"]
if temperature > 28 and self.last_alert != "high_temperature":
self.last_alert = "high_temperature"
self.alert_count += 1
return f"High temperature detected! Sensor {data['sensor_id']} reading: {temperature:.2f}°C"
elif temperature <= 28 and self.last_alert == "high_temperature":
self.last_alert = None
return "Temperature back to normal."
class PressureSensor(Sensor):
def generate_data(self):
pressure = random.uniform(100, 150) # Simulate pressure readings
return {"sensor_id": self.sensor_id, "pressure": pressure}
def process_data(self, data):
pressure = data["pressure"]
if pressure > 130 and self.last_alert != "high_pressure":
self.last_alert = "high_pressure"
self.alert_count += 1
return f"High pressure detected! Sensor {data['sensor_id']} reading: {pressure:.2f} psi"
elif pressure <= 130 and self.last_alert == "high_pressure":
self.last_alert = None
return "Pressure back to normal."
class VibrationSensor(Sensor):
def generate_data(self):
vibration = random.uniform(0.5, 2.5) # Simulate vibration readings
return {"sensor_id": self.sensor_id, "vibration": vibration}
def process_data(self, data):
vibration = data["vibration"]
if vibration > 2.0 and self.last_alert != "excessive_vibration":
self.last_alert = "excessive_vibration"
self.alert_count += 1
return f"Excessive vibration detected! Sensor {data['sensor_id']} reading: {vibration:.2f}"
elif vibration <= 2.0 and self.last_alert == "excessive_vibration":
self.last_alert = None
return "Vibration back to normal."

Code explanation

  • ...