Add consumption curve to PV graph, fix text-curve overlap

This commit is contained in:
Joan
2026-03-20 13:33:20 +01:00
parent be3d23bb79
commit b038f2ee8f
2 changed files with 97 additions and 74 deletions

View File

@@ -118,8 +118,8 @@ def get_solar_status():
def get_solar_history():
"""Get today's solar production history from HA History API.
Returns list of (hour_float, watts) tuples."""
"""Get today's solar production and consumption history from HA History API.
Returns dict with 'production' and 'consumption' lists of (hour_float, watts)."""
headers = {
"Authorization": f"Bearer {HA_TOKEN}",
"Content-Type": "application/json",
@@ -127,35 +127,45 @@ def get_solar_history():
now = datetime.datetime.now()
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
start_str = start.strftime("%Y-%m-%dT%H:%M:%S")
entities = f"{SOLAR_PRODUCTION_ENTITY},{SOLAR_LOAD_POWER_ENTITY}"
url = (f"{HASS_URL}/api/history/period/{start_str}"
f"?filter_entity_id={SOLAR_PRODUCTION_ENTITY}"
f"?filter_entity_id={entities}"
f"&minimal_response&no_attributes")
try:
response = requests.get(url, headers=headers, timeout=15)
if response.status_code != 200:
return []
return {"production": [], "consumption": []}
data = response.json()
if not data or not data[0]:
return []
utc_offset = now - datetime.datetime.utcnow()
def parse_entries(entries):
points = []
for entry in data[0]:
for entry in entries:
try:
state = float(entry.get("state", 0))
ts = entry.get("last_changed", "")
# Parse ISO timestamp
if "+" in ts:
ts = ts.split("+")[0]
elif ts.endswith("Z"):
ts = ts[:-1]
dt = datetime.datetime.fromisoformat(ts)
# Convert UTC to local time
utc_offset = now - datetime.datetime.utcnow()
dt_local = dt + utc_offset
hour_f = dt_local.hour + dt_local.minute / 60.0
points.append((hour_f, state))
except (ValueError, TypeError):
continue
return points
result = {"production": [], "consumption": []}
for entity_data in data:
if not entity_data:
continue
entity_id = entity_data[0].get("entity_id", "")
if entity_id == SOLAR_PRODUCTION_ENTITY:
result["production"] = parse_entries(entity_data)
elif entity_id == SOLAR_LOAD_POWER_ENTITY:
result["consumption"] = parse_entries(entity_data)
return result
except Exception as e:
print(f"Error fetching solar history: {e}")
return []
return {"production": [], "consumption": []}

View File

@@ -134,6 +134,7 @@ class Matrix64Display(SampleBase):
self.tesla = None
self.solar = None
self.pv_history = [] # list of (hour_float, watts) from HA history
self.consumption_history = [] # list of (hour_float, watts)
self.last_pv_history_update = 0
# View state
@@ -678,7 +679,7 @@ class Matrix64Display(SampleBase):
graphics.DrawText(canvas, small_font, 58, 62, label_color, "A")
def draw_view_6(self, canvas, fonts, colors):
"""Vista Curva PV - Today's photovoltaic production curve"""
"""Vista Curva PV - Today's production and consumption curves"""
time_font, data_font, small_font = fonts
time_color, humidity_color, hdd_color, label_color, line_color = colors
@@ -688,14 +689,14 @@ class Matrix64Display(SampleBase):
graphics.DrawText(canvas, data_font, title_x, 8, time_color, title)
graphics.DrawLine(canvas, 0, 10, 63, 10, line_color)
# Graph area: x=2..61 (60px), y=14..57 (44px)
# Graph area: x=2..61 (60px), y=18..57 (40px) - top margin for labels
graph_x = 2
graph_w = 60
graph_y_top = 14
graph_y_top = 18
graph_y_bot = 57
graph_h = graph_y_bot - graph_y_top
# Time axis: 6:00 to 21:00 (15 hours of daylight)
# Time axis: 6:00 to 21:00
t_start = 6.0
t_end = 21.0
t_range = t_end - t_start
@@ -710,62 +711,72 @@ class Matrix64Display(SampleBase):
graphics.DrawText(canvas, small_font, graph_x + 38, 63, label_color, "15")
graphics.DrawText(canvas, small_font, graph_x + 52, 63, label_color, "20")
if not self.pv_history:
graphics.DrawText(canvas, small_font, 10, 36, label_color, "Sin datos")
if not self.pv_history and not self.consumption_history:
graphics.DrawText(canvas, small_font, 10, 38, label_color, "Sin datos")
return
# Find max production for scaling
max_prod = max(p for _, p in self.pv_history)
if max_prod <= 0:
max_prod = 1
# Find max across both datasets for shared scale
max_val = 1
if self.pv_history:
max_val = max(max_val, max(p for _, p in self.pv_history))
if self.consumption_history:
max_val = max(max_val, max(p for _, p in self.consumption_history))
# Draw max value label
if max_prod >= 1000:
max_label = f"{max_prod / 1000:.1f}kW"
# Labels above graph area (with margin)
if max_val >= 1000:
max_label = f"{max_val / 1000:.1f}kW"
else:
max_label = f"{int(max_prod)}W"
graphics.DrawText(canvas, small_font, graph_x + 1, graph_y_top + 5, graphics.Color(255, 200, 0), max_label)
max_label = f"{int(max_val)}W"
graphics.DrawText(canvas, small_font, graph_x + 1, graph_y_top - 2, label_color, max_label)
# Plot curve - bucket data points into pixel columns
# Current values in top-right
if self.solar:
cur_pv = self.solar.get("production")
if cur_pv is not None:
graphics.DrawText(canvas, small_font, 34, graph_y_top - 2, graphics.Color(255, 200, 0), f"{int(cur_pv)}W")
def bucket_data(history):
buckets = [[] for _ in range(graph_w)]
for t, p in self.pv_history:
for t, p in history:
if t_start <= t <= t_end:
col = int((t - t_start) / t_range * (graph_w - 1))
col = max(0, min(graph_w - 1, col))
buckets[col].append(p)
return buckets
def draw_curve(buckets, r, g, b, fill_r, fill_g, fill_b, fill=True):
prev_y = None
pv_color = graphics.Color(255, 200, 0)
fill_color = graphics.Color(60, 50, 0)
for col in range(graph_w):
if buckets[col]:
avg_p = sum(buckets[col]) / len(buckets[col])
pixel_y = graph_y_bot - int((avg_p / max_prod) * graph_h)
pixel_y = graph_y_bot - int((avg_p / max_val) * graph_h)
pixel_y = max(graph_y_top, min(graph_y_bot, pixel_y))
# Fill area under curve
if pixel_y < graph_y_bot:
if fill and pixel_y < graph_y_bot:
for fy in range(pixel_y + 1, graph_y_bot):
canvas.SetPixel(graph_x + col, fy, 60, 50, 0)
canvas.SetPixel(graph_x + col, fy, fill_r, fill_g, fill_b)
# Draw point
canvas.SetPixel(graph_x + col, pixel_y, 255, 200, 0)
canvas.SetPixel(graph_x + col, pixel_y, r, g, b)
# Connect to previous point
# Connect to previous
if prev_y is not None:
dy = pixel_y - prev_y
steps = max(abs(dy), 1)
for s in range(1, steps):
iy = prev_y + int(dy * s / steps)
canvas.SetPixel(graph_x + col, iy, 255, 200, 0)
canvas.SetPixel(graph_x + col, iy, r, g, b)
prev_y = pixel_y
# Current production value
if self.solar and self.solar.get("production") is not None:
cur = int(self.solar["production"])
graphics.DrawText(canvas, small_font, 30, graph_y_top + 5, graphics.Color(200, 200, 200), f"{cur}W")
# Draw consumption first (behind), then production (on top)
if self.consumption_history:
cons_buckets = bucket_data(self.consumption_history)
draw_curve(cons_buckets, 80, 160, 255, 15, 25, 50)
if self.pv_history:
pv_buckets = bucket_data(self.pv_history)
draw_curve(pv_buckets, 255, 200, 0, 60, 50, 0)
if self.auto_cycle:
graphics.DrawText(canvas, small_font, 58, 62, label_color, "A")
@@ -809,7 +820,9 @@ class Matrix64Display(SampleBase):
# Update PV history every 5 minutes
if current_time - self.last_pv_history_update >= 300:
try:
self.pv_history = get_solar_history()
history = get_solar_history()
self.pv_history = history.get("production", [])
self.consumption_history = history.get("consumption", [])
except Exception as e:
print(f"Error updating PV history: {e}")
self.last_pv_history_update = current_time