From b038f2ee8f64de42f4d077c8ddbce0614de1fd72 Mon Sep 17 00:00:00 2001 From: Joan Date: Fri, 20 Mar 2026 13:33:20 +0100 Subject: [PATCH] Add consumption curve to PV graph, fix text-curve overlap --- home_assistant.py | 60 ++++++++++++++----------- matrix.py | 111 ++++++++++++++++++++++++++-------------------- 2 files changed, 97 insertions(+), 74 deletions(-) diff --git a/home_assistant.py b/home_assistant.py index c575405..96df104 100644 --- a/home_assistant.py +++ b/home_assistant.py @@ -118,8 +118,8 @@ def get_solar_status(): def get_solar_history(): - """Get today's solar production history from HA History API. - Returns list of (hour_float, watts) tuples.""" + """Get today's solar production and consumption history from HA History API. + Returns dict with 'production' and 'consumption' lists of (hour_float, watts).""" headers = { "Authorization": f"Bearer {HA_TOKEN}", "Content-Type": "application/json", @@ -127,35 +127,45 @@ def get_solar_history(): now = datetime.datetime.now() start = now.replace(hour=0, minute=0, second=0, microsecond=0) start_str = start.strftime("%Y-%m-%dT%H:%M:%S") + entities = f"{SOLAR_PRODUCTION_ENTITY},{SOLAR_LOAD_POWER_ENTITY}" url = (f"{HASS_URL}/api/history/period/{start_str}" - f"?filter_entity_id={SOLAR_PRODUCTION_ENTITY}" + f"?filter_entity_id={entities}" f"&minimal_response&no_attributes") try: response = requests.get(url, headers=headers, timeout=15) if response.status_code != 200: - return [] + return {"production": [], "consumption": []} data = response.json() - if not data or not data[0]: - return [] - points = [] - for entry in data[0]: - try: - state = float(entry.get("state", 0)) - ts = entry.get("last_changed", "") - # Parse ISO timestamp - if "+" in ts: - ts = ts.split("+")[0] - elif ts.endswith("Z"): - ts = ts[:-1] - dt = datetime.datetime.fromisoformat(ts) - # Convert UTC to local time - utc_offset = now - datetime.datetime.utcnow() - dt_local = dt + utc_offset - hour_f = dt_local.hour + dt_local.minute / 60.0 - points.append((hour_f, state)) - except (ValueError, TypeError): + utc_offset = now - datetime.datetime.utcnow() + + def parse_entries(entries): + points = [] + for entry in entries: + try: + state = float(entry.get("state", 0)) + ts = entry.get("last_changed", "") + if "+" in ts: + ts = ts.split("+")[0] + elif ts.endswith("Z"): + ts = ts[:-1] + dt = datetime.datetime.fromisoformat(ts) + dt_local = dt + utc_offset + hour_f = dt_local.hour + dt_local.minute / 60.0 + points.append((hour_f, state)) + except (ValueError, TypeError): + continue + return points + + result = {"production": [], "consumption": []} + for entity_data in data: + if not entity_data: continue - return points + entity_id = entity_data[0].get("entity_id", "") + if entity_id == SOLAR_PRODUCTION_ENTITY: + result["production"] = parse_entries(entity_data) + elif entity_id == SOLAR_LOAD_POWER_ENTITY: + result["consumption"] = parse_entries(entity_data) + return result except Exception as e: print(f"Error fetching solar history: {e}") - return [] + return {"production": [], "consumption": []} diff --git a/matrix.py b/matrix.py index 02b8ebe..2ae773d 100644 --- a/matrix.py +++ b/matrix.py @@ -134,6 +134,7 @@ class Matrix64Display(SampleBase): self.tesla = None self.solar = None self.pv_history = [] # list of (hour_float, watts) from HA history + self.consumption_history = [] # list of (hour_float, watts) self.last_pv_history_update = 0 # View state @@ -678,7 +679,7 @@ class Matrix64Display(SampleBase): graphics.DrawText(canvas, small_font, 58, 62, label_color, "A") def draw_view_6(self, canvas, fonts, colors): - """Vista Curva PV - Today's photovoltaic production curve""" + """Vista Curva PV - Today's production and consumption curves""" time_font, data_font, small_font = fonts time_color, humidity_color, hdd_color, label_color, line_color = colors @@ -688,14 +689,14 @@ class Matrix64Display(SampleBase): graphics.DrawText(canvas, data_font, title_x, 8, time_color, title) graphics.DrawLine(canvas, 0, 10, 63, 10, line_color) - # Graph area: x=2..61 (60px), y=14..57 (44px) + # Graph area: x=2..61 (60px), y=18..57 (40px) - top margin for labels graph_x = 2 graph_w = 60 - graph_y_top = 14 + graph_y_top = 18 graph_y_bot = 57 graph_h = graph_y_bot - graph_y_top - # Time axis: 6:00 to 21:00 (15 hours of daylight) + # Time axis: 6:00 to 21:00 t_start = 6.0 t_end = 21.0 t_range = t_end - t_start @@ -710,62 +711,72 @@ class Matrix64Display(SampleBase): graphics.DrawText(canvas, small_font, graph_x + 38, 63, label_color, "15") graphics.DrawText(canvas, small_font, graph_x + 52, 63, label_color, "20") - if not self.pv_history: - graphics.DrawText(canvas, small_font, 10, 36, label_color, "Sin datos") + if not self.pv_history and not self.consumption_history: + graphics.DrawText(canvas, small_font, 10, 38, label_color, "Sin datos") return - # Find max production for scaling - max_prod = max(p for _, p in self.pv_history) - if max_prod <= 0: - max_prod = 1 + # Find max across both datasets for shared scale + max_val = 1 + if self.pv_history: + max_val = max(max_val, max(p for _, p in self.pv_history)) + if self.consumption_history: + max_val = max(max_val, max(p for _, p in self.consumption_history)) - # Draw max value label - if max_prod >= 1000: - max_label = f"{max_prod / 1000:.1f}kW" + # Labels above graph area (with margin) + if max_val >= 1000: + max_label = f"{max_val / 1000:.1f}kW" else: - max_label = f"{int(max_prod)}W" - graphics.DrawText(canvas, small_font, graph_x + 1, graph_y_top + 5, graphics.Color(255, 200, 0), max_label) + max_label = f"{int(max_val)}W" + graphics.DrawText(canvas, small_font, graph_x + 1, graph_y_top - 2, label_color, max_label) - # Plot curve - bucket data points into pixel columns - buckets = [[] for _ in range(graph_w)] - for t, p in self.pv_history: - if t_start <= t <= t_end: - col = int((t - t_start) / t_range * (graph_w - 1)) - col = max(0, min(graph_w - 1, col)) - buckets[col].append(p) + # Current values in top-right + if self.solar: + cur_pv = self.solar.get("production") + if cur_pv is not None: + graphics.DrawText(canvas, small_font, 34, graph_y_top - 2, graphics.Color(255, 200, 0), f"{int(cur_pv)}W") - prev_y = None - pv_color = graphics.Color(255, 200, 0) - fill_color = graphics.Color(60, 50, 0) + def bucket_data(history): + buckets = [[] for _ in range(graph_w)] + for t, p in history: + if t_start <= t <= t_end: + col = int((t - t_start) / t_range * (graph_w - 1)) + col = max(0, min(graph_w - 1, col)) + buckets[col].append(p) + return buckets - for col in range(graph_w): - if buckets[col]: - avg_p = sum(buckets[col]) / len(buckets[col]) - pixel_y = graph_y_bot - int((avg_p / max_prod) * graph_h) - pixel_y = max(graph_y_top, min(graph_y_bot, pixel_y)) + def draw_curve(buckets, r, g, b, fill_r, fill_g, fill_b, fill=True): + prev_y = None + for col in range(graph_w): + if buckets[col]: + avg_p = sum(buckets[col]) / len(buckets[col]) + pixel_y = graph_y_bot - int((avg_p / max_val) * graph_h) + pixel_y = max(graph_y_top, min(graph_y_bot, pixel_y)) - # Fill area under curve - if pixel_y < graph_y_bot: - for fy in range(pixel_y + 1, graph_y_bot): - canvas.SetPixel(graph_x + col, fy, 60, 50, 0) + # Fill area under curve + if fill and pixel_y < graph_y_bot: + for fy in range(pixel_y + 1, graph_y_bot): + canvas.SetPixel(graph_x + col, fy, fill_r, fill_g, fill_b) - # Draw point - canvas.SetPixel(graph_x + col, pixel_y, 255, 200, 0) + # Draw point + canvas.SetPixel(graph_x + col, pixel_y, r, g, b) - # Connect to previous point - if prev_y is not None: - dy = pixel_y - prev_y - steps = max(abs(dy), 1) - for s in range(1, steps): - iy = prev_y + int(dy * s / steps) - canvas.SetPixel(graph_x + col, iy, 255, 200, 0) + # Connect to previous + if prev_y is not None: + dy = pixel_y - prev_y + steps = max(abs(dy), 1) + for s in range(1, steps): + iy = prev_y + int(dy * s / steps) + canvas.SetPixel(graph_x + col, iy, r, g, b) - prev_y = pixel_y + prev_y = pixel_y - # Current production value - if self.solar and self.solar.get("production") is not None: - cur = int(self.solar["production"]) - graphics.DrawText(canvas, small_font, 30, graph_y_top + 5, graphics.Color(200, 200, 200), f"{cur}W") + # Draw consumption first (behind), then production (on top) + if self.consumption_history: + cons_buckets = bucket_data(self.consumption_history) + draw_curve(cons_buckets, 80, 160, 255, 15, 25, 50) + if self.pv_history: + pv_buckets = bucket_data(self.pv_history) + draw_curve(pv_buckets, 255, 200, 0, 60, 50, 0) if self.auto_cycle: graphics.DrawText(canvas, small_font, 58, 62, label_color, "A") @@ -809,7 +820,9 @@ class Matrix64Display(SampleBase): # Update PV history every 5 minutes if current_time - self.last_pv_history_update >= 300: try: - self.pv_history = get_solar_history() + history = get_solar_history() + self.pv_history = history.get("production", []) + self.consumption_history = history.get("consumption", []) except Exception as e: print(f"Error updating PV history: {e}") self.last_pv_history_update = current_time