commit 490551c487021f8f4d17ba8f2fbb15f9fbc8ae48 Author: ankur Date: Tue Apr 22 05:52:48 2025 +0000 Upload files to "/" diff --git a/data_migration_end_to_end.ipynb b/data_migration_end_to_end.ipynb new file mode 100644 index 0000000..5d183e6 --- /dev/null +++ b/data_migration_end_to_end.ipynb @@ -0,0 +1,999 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "private_outputs": true, + "provenance": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "7kZDwuM2BdFf" + }, + "outputs": [], + "source": [ + "import time\n", + "import logging\n", + "import traceback\n", + "from math import ceil\n", + "from os import getenv\n", + "from datetime import datetime, timedelta\n", + "from pathlib import Path\n", + "\n", + "import pandas as pd\n", + "import polars as pl\n", + "import pytz\n", + "from zoneinfo import ZoneInfo\n", + "\n", + "import boto3\n", + "import awswrangler as wr\n", + "import redshift_connector\n", + "import psycopg2\n", + "\n", + "from pymongo import MongoClient, errors\n", + "from pymongo.database import Database\n", + "from pymongo.collection import Collection\n", + "\n", + "from sqlalchemy import create_engine, text\n", + "from sqlalchemy.orm import sessionmaker\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "IST = pytz.timezone('Asia/Kolkata')\n", + "now = datetime.now(IST)\n", + "print(now.strftime('%Y-%m-%d %H:%M:%S'))\n", + "\n", + "\n", + "\n", + "UTC = pytz.UTC # or: pytz.timezone('UTC')\n", + "now = datetime.now(UTC)\n", + "print(now)\n", + "\n", + "\n", + "\n", + "\n", + "#----------------------port forwarding ---------------------\n", + "\n", + "\n", + "import subprocess\n", + "import socket\n", + "import time\n", + "\n", + "def is_port_in_use(port):\n", + " with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:\n", + " return s.connect_ex(('localhost', port)) == 0\n", + "\n", + "def start_port_forward():\n", + " try:\n", + " print(\"Starting port-forward...\")\n", + " # You can add full path to ./kubectl if needed\n", + " subprocess.Popen(\n", + " ['./kubectl', 'port-forward', '--namespace', 'mongo', 'mongo-db-dr-0', '9001:27017'],\n", + " stdout=subprocess.DEVNULL,\n", + " stderr=subprocess.DEVNULL\n", + " )\n", + " print(\"Port-forward command issued.\")\n", + " except Exception as e:\n", + " print(f\"Error starting port-forward: {e}\")\n", + "\n", + "def ensure_port_forward():\n", + " if is_port_in_use(9001):\n", + " print(\"Port 9001 is already in use — no need to start port-forward.\")\n", + " else:\n", + " print(\"Port 9001 is NOT in use — starting port-forward.\")\n", + " start_port_forward()\n", + " time.sleep(5) # Wait a few seconds to let it establish\n", + " if is_port_in_use(9001):\n", + " print(\"Port-forward established successfully\")\n", + " else:\n", + " print(\"Failed to establish port-forward \")\n", + "\n", + "if __name__ == \"__main__\":\n", + " ensure_port_forward()\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "#-----------------------------data migration start-----------------------\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "# Configure logging to print instead of writing to a file\n", + "logging.basicConfig(level=logging.INFO, format=\"%(asctime)s - %(levelname)s - %(message)s\")\n", + "def data_migration(client):\n", + " \"\"\"Fetch, process, and store data from MongoDB.\"\"\"\n", + " try:\n", + " db = client[\"lightson\"]\n", + " logging.info(\"Connected to MongoDB successfully.\")\n", + " print(\" Connection successfully established.\")\n", + "\n", + "\n", + "\n", + "\n", + " # New_Energy_Saving Data\n", + " collection = db['new_energySaving']\n", + " end_date = datetime.today() - timedelta(days=1)\n", + " # Ensure all required columns exist\n", + " required_columns = [\"_id\", \"serialNo\", \"devices\", \"mitigated_co2\",\"et\" , \"total_kwh\", \"deviceMapID\", \"weather_min_temp\", \"weather_max_temp\", \"billing_amount\",\n", + " \"updatedAt\", \"ac_fan_hrs\", \"ac_run_hrs\", \"algo_status\", \"clientID\", \"cost_reduction\", \"createdAt\", \"energy_savings_ref_kwh\",\n", + " \"energy_savings_us_meter\", \"energy_savings_inv_factor\", \"energy_savings_us_calc\", \"energy_savings_savings_percent\"]\n", + "\n", + " cursor = collection.find({\"createdAt\": {\"$lt\": end_date},},\n", + " {\"_id\": 1, \"serialNo\": 1, \"devices\": 1, \"mitigated_co2\": 1, \"total_kwh\": 1, \"deviceMapID\": 1, \"weather\": 1,\n", + " \"billing_amount\": 1, \"updatedAt\": 1, \"ac_fan_hrs\": 1, \"ac_run_hrs\": 1, \"algo_status\": 1,\n", + " \"clientID\": 1, \"cost_reduction\": 1, \"createdAt\": 1, \"energy_savings\": 1})\n", + " # Convert cursor to a list\n", + " data = list(cursor)\n", + " if not data:\n", + " print(\"No data found in the specified date range.\")\n", + " else:\n", + " df_nes = pd.json_normalize(data, sep='_')\n", + " df_nes = df_nes.explode(\"devices\")\n", + " # Convert ObjectId fields to strings\n", + "\n", + " for col in ['_id', 'clientID', 'deviceMapID', 'devices', \"serialNo\"]:\n", + " if col in df_nes.columns:\n", + " df_nes[col] = df_nes[col].astype(str)\n", + "\n", + " # Convert numerical fields properly\n", + "\n", + " for col in [\"mitigated_co2\", \"energy_savings_ref_kwh\", \"energy_savings_savings_percent\"]:\n", + " if col in df_nes.columns:\n", + " df_nes[col] = pd.to_numeric(df_nes[col], errors=\"coerce\")\n", + "\n", + "\n", + " for col in required_columns:\n", + " if col not in df_nes.columns:\n", + " df_nes[col] = None\n", + "\n", + " df_nes = df_nes[required_columns]\n", + "\n", + " df_nes.drop_duplicates(subset='_id')\n", + " df_nes[\"et\"] = df_nes[\"et\"].fillna(\"\")\n", + " df_nes.rename(columns={'weather_min_temp': 'min_temp','weather_max_temp': 'max_temp','energy_savings_ref_kwh': 'ref_kwh',\n", + " 'energy_savings_us_meter': 'us_meter','energy_savings_inv_factor': 'inv_factor',\n", + " 'energy_savings_us_calc': 'us_calc','energy_savings_savings_percent': 'savings_percent'}, inplace=True)\n", + " # df_nes[\"et\"] = df_nes[\"et\"].fillna(\"0\").astype(str)\n", + " df_nes.to_parquet(\"New_Energy_Saving.parquet\", use_deprecated_int96_timestamps=True, index=False, engine='pyarrow')\n", + " df_nes = pd.read_parquet(\"New_Energy_Saving.parquet\")\n", + " print(df_nes.head(3))\n", + " print(\"New_Energy_Saving.parquet has been successfully saved.\")\n", + "\n", + " # Devices Data\n", + " df_devices = pd.DataFrame(db.devices.find())\n", + " if df_devices.empty:\n", + " logging.warning(\"No data found in 'devices' collection.\")\n", + " else:\n", + " df_devices[\"_id\"] = df_devices[\"_id\"].astype(str)\n", + " df_devices[\"assignedTo\"] = df_devices[\"assignedTo\"].astype(str)\n", + " if \"ports\" in df_devices.columns:\n", + " df_devices[\"ports\"] = df_devices[\"ports\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", + " df_devices = df_devices.explode(\"ports\")\n", + " for column in df_devices.columns:\n", + " if column not in [\"_id\",\"__v\",\"algorithm_type\",\"assignedTo\",\"AV1_ENABLE\",\"AV1_PENDING_STATUS\",\"AV2_ENABLE\",\"AV2_PENDING_STATUS\",\"configured\",\n", + " \"createdAt\",\"deviceId\",\"dType\",\"IS_MAINTENANCE\",\"IS_PREDICTIVE_MAINTENANCE\",\"last_change\",\"lastConfiguredAt\",\"lastFactoryResetAt\",\"live_algo_status\",\n", + " \"lp\",\"macId\",\"modelnumber\",\"ns\",\"online\",\"OTAUpdatedAt\",\"ports\",\"proto_updatedAt\",\"protocolname\",\"status\",\"temp\",\"updatedAt\",\"version\",]:\n", + " df_devices.drop(columns=column, inplace=True)\n", + " df_devices.to_parquet(\"devices.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", + " df_devices = pd.read_parquet(\"devices.parquet\")\n", + " df_devices = df_devices.drop_duplicates(subset=[\"_id\"])\n", + " print(\"Devices Data:\\n\", df_devices.head(1)) # Debugging Output\n", + "\n", + " # Ports Data\n", + " df_ports = pd.DataFrame(db.ports.find())\n", + " if df_ports.empty:\n", + " logging.warning(\"No data found in 'ports' collection.\")\n", + " else:\n", + " df_ports = df_ports.astype({\"_id\": \"string\", \"device\": \"string\", \"groupId\": \"string\"}, errors=\"ignore\")\n", + " df_ports = df_ports.astype(\"string\", errors=\"ignore\")\n", + " df_ports.to_parquet(\"ports.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", + " df_ports = pd.read_parquet(\"ports.parquet\")\n", + "\n", + " print(\"Ports Data:\\n\", df_ports.head(1))\n", + "\n", + " # Clients Data\n", + " df_client = pd.DataFrame(db.clients.find())\n", + " if df_client.empty:\n", + " logging.warning(\"No data found in 'clients' collection.\")\n", + " else:\n", + " df_client[\"_id\"] = df_client[\"_id\"].astype(str)\n", + " for col in [\"users\", \"feature_configuration\"]:\n", + " if col in df_client.columns:\n", + " df_client.drop(col, axis=1, inplace=True)\n", + " if \"devices\" in df_client.columns:\n", + " df_client[\"devices\"] = df_client[\"devices\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", + " df_client = df_client.explode(\"devices\")\n", + " for column in df_client.columns:\n", + " if column not in [\"_id\",\"devices\",\"n\",\"e\",\"m\",\"addr\",\"pin\",\"city\",\"state\",\n", + " \"country\",\"createdAt\",\"updatedAt\",\"__v\",\"ct\"]:\n", + " df_client.drop(columns=column, inplace=True)\n", + " df_client.to_parquet(\"clients.parquet\" , use_deprecated_int96_timestamps = True, index = False)\n", + " df_client = pd.read_parquet(\"clients.parquet\")\n", + " print(\"Clients Data:\\n\", df_client.head(1)) # Debugging Output\n", + "\n", + " # Device Map Data\n", + " df_deviceMap = pd.DataFrame(client.lightson.deviceMap.find())\n", + " if df_deviceMap.empty:\n", + " logging.warning(\"No data found in 'deviceMap' collection.\")\n", + " else:\n", + " df_deviceMap[\"_id\"] = df_deviceMap[\"_id\"].astype(str)\n", + " df_deviceMap[\"ac_age\"] = df_deviceMap[\"ac_age\"].astype(str)\n", + " if \"clientID\" in df_deviceMap.columns:\n", + " df_deviceMap[\"clientID\"] = df_deviceMap[\"clientID\"].astype(str)\n", + " if \"devices\" in df_deviceMap.columns:\n", + " df_deviceMap[\"devices\"] = df_deviceMap[\"devices\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", + " df_deviceMap = df_deviceMap.explode(\"devices\")\n", + " if \"facilityId\" in df_deviceMap.columns:\n", + " df_deviceMap[\"facilityId\"] = df_deviceMap[\"facilityId\"].astype(str)\n", + " df_devicemap2 = df_deviceMap.copy(deep = True)\n", + " df_deviceMap = pd.concat([df_devicemap2.drop(\"toggle_threshold\", axis = 1), pd.json_normalize(df_devicemap2[\"toggle_threshold\"], sep = \"_\").add_prefix(\"toggle_threshold_\")])\n", + " for column in df_deviceMap.columns:\n", + " if column not in [\"_id\",\"__v\",\"ac_age\",\"ac_brand\",\"ac_model_number\",\"ac_ton\",\"ac_type\",\"atm_id\",\"auto_params_calculation\",\"city\",\"client_type\",\"clientID\",\"cool_down\",\n", + " \"createdAt\",\"current_kwh\",\"current_threshold\",\"devices\",\"Efan\",\"fan_consumption\",\"fan_off_equal\",\"fan_threshold\",\"fm_name\",\"fromdate\",\"installation_date\",\n", + " \"latest_ES_Date\",\"Meter\",\"mode_threshold\",\"facilityId\",\"model\",\"NAC\",\"perUnitCost\",\"phase\",\"power_correction\",\"ref_kwh\",\"run_hrs\",\"sched_lock\",\n", + " \"serialNo\",\"SIM_number\",\"site\",\"site_category\",\"site_status\",\"temp_threshold\",\"todate\",\"total_real_energy\",\"updatedAt\",\"sched_lock\",\n", + " \"sched_executed_at\",\"sched_executed_pwr\",\"sched_imminent\",\"sched_last_mismatch_at\",\"sched_corrected_at\",\"star_rating\",\"toggle_threshold_on2off\",\n", + " \"toggle_threshold_off2on\",\"toggle_threshold_notfan2fan\",\"toggle_threshold_fan2notfan\"\n", + " ]:\n", + " df_deviceMap.drop(columns=column, inplace=True)\n", + " df_deviceMap.to_parquet(\"deviceMap.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", + " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\")\n", + " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\").drop_duplicates(subset=['_id'], keep='first').reset_index(drop=True)\n", + "\n", + " print(\"Device Map Data:\\n\", df_deviceMap.head(1))\n", + "\n", + "\n", + " # daily_dump\n", + "\n", + " df_dump = pd.read_csv(f\"https://docs.google.com/spreadsheets/d/{'16CfL8N28-2nqUxWkbRGQVKlGViW0SoGlbsX4cUIzxKY'}/export?format=csv\")\n", + " df_dump.columns = df_dump.columns.str.replace(' ', '_')\n", + " df_dump = df_dump.astype(\"string\", errors=\"ignore\")\n", + " df_dump[\"Date\"] = pd.to_datetime(df_dump[\"Date\"], format = \"%d/%m/%Y\", errors=\"coerce\")\n", + " df_dump.to_parquet(\"daily_dump.parquet\", index = False, use_deprecated_int96_timestamps=True)\n", + " df_dump = pd.read_parquet(\"daily_dump.parquet\")\n", + "\n", + "\n", + "\n", + "\n", + "\n", + " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\")\n", + " df_clients = pd.read_parquet(\"clients.parquet\")\n", + " df_ports = pd.read_parquet(\"ports.parquet\")\n", + " df_nes = pd.read_parquet(\"New_Energy_Saving.parquet\")\n", + " df_devices = pd.read_parquet(\"devices.parquet\")\n", + " df_dump = pd.read_parquet(\"daily_dump.parquet\")\n", + " # Correct dictionary (keys should be strings, not DataFrames)\n", + " dfs = {\"devicemap\": df_deviceMap,\"clients\": df_clients,\"ports\": df_ports,\"New_Energy_Saving\": df_nes,\"devices\": df_devices ,\"daily_dump\":df_dump}\n", + "\n", + " session = boto3.Session(\n", + " aws_access_key_id = \"AKIARXCGNNYDRHFBBB6I\",\n", + " aws_secret_access_key = \"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\",\n", + " region_name=\"ap-south-1\")\n", + "\n", + " s3_path = \"s3://daily-dump-bucket/daily_dump\"\n", + "\n", + "\n", + " conn_redshift = wr.redshift.connect(secret_id=\"sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb\", boto3_session=session)\n", + "\n", + " # Loop through DataFrames and print their sizes\n", + " dfs = {\"devicemap\": df_deviceMap,\"clients\": df_clients,\"ports\": df_ports,\"new_energy_saving\": df_nes,\"devices\": df_devices ,\"daily_dump\": df_dump}\n", + " for name, df in dfs.items():\n", + " wr.redshift.copy(df=df,\n", + " path= s3_path,\n", + " con=conn_redshift,\n", + " schema=\"lightson\",\n", + " table=name,\n", + " mode=\"overwrite\",\n", + " varchar_lengths_default=65535,\n", + " boto3_session=session)\n", + " time.sleep(5) # Delay to prevent rapid successive writes\n", + "\n", + "\n", + " logging.info(\"Data migration completed successfully.\")\n", + " return df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump\n", + "\n", + "\n", + " wr.redshift.copy(df=df_dump,\n", + " path= s3_path,\n", + " con=conn_redshift,\n", + " schema=\"lightson\",\n", + " table=daily_dump_historical,\n", + " mode=\"append\",\n", + " varchar_lengths_default=65535,\n", + " boto3_session=session)\n", + "\n", + " except errors.ServerSelectionTimeoutError:\n", + " logging.error(\" MongoDB connection failed! Is the server running?\")\n", + " return None\n", + " except Exception as e:\n", + " logging.error(f\" Unexpected error: {e}\")\n", + " return None\n", + "\n", + "\n", + "# Example usage\n", + "try:\n", + " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=5000\")\n", + " logging.info(\"MongoDB client created successfully.\")\n", + "\n", + " df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump = data_migration(client)\n", + "\n", + " # Final check if data was retrieved\n", + " if df_nes is not None:\n", + " print(\"\\n Migration completed successfully.\")\n", + " else:\n", + " print(\"\\n Migration failed. Check logs for details.\")\n", + "except Exception as e:\n", + " logging.error(f\" Fatal error in script: {e}\")\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "#---------------------------------------------dls migration ------------------------------\n", + "\n", + "\n", + "import pandas as pd\n", + "import boto3\n", + "import awswrangler as wr\n", + "from datetime import datetime, timedelta\n", + "from pymongo import MongoClient\n", + "from pathlib import Path\n", + "import os\n", + "import polars as pl\n", + "from pytz import timezone\n", + "\n", + "# Constants\n", + "UTC = timezone('UTC')\n", + "IST = timezone('Asia/Kolkata')\n", + "\n", + "def main():\n", + " try:\n", + " # Initialize connections and settings\n", + " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0\")\n", + " db = client['lightson']\n", + " collection = db['dls']\n", + "\n", + " s3_path = \"s3://daily-dump-bucket/ankur-v\"\n", + " session = boto3.Session(\n", + " aws_access_key_id=\"AKIARXCGNNYDRHFBBB6I\",\n", + " aws_secret_access_key=\"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\",\n", + " region_name=\"ap-south-1\"\n", + " )\n", + "\n", + " conn_redshift = wr.redshift.connect(secret_id=\"sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb\", boto3_session=session)\n", + "\n", + " # Clean up previous file if exists\n", + " Path(\"dls.csv\").unlink(missing_ok=True)\n", + "\n", + " # Batch size for processing\n", + " batch_size = 200000\n", + " output_file = \"dls.csv\"\n", + "\n", + " # Get yesterday's date in UTC (12:00 AM IST)\n", + " start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)\n", + "\n", + " # Define the expected columns (schema) for the CSV file\n", + " columns = [\n", + " \"_id\", \"__v\", \"m\", \"t\", \"pl_m\", \"pl_v\", \"pl_Algo_flag\", \"pl_pir\", \"pl_DS\", \"pl_F\",\n", + " \"pl_https_exp\", \"pl_Humidity\", \"pl_LMF\", \"pl_Mo\", \"pl_mqtts_exp\", \"pl_multiple_flag\",\n", + " \"pl_NS\", \"pl_online\", \"dId\", \"cId\", \"ports\", \"updatedAt\", \"createdAt\",\n", + " \"documentUpdatedAt\", \"pl_tz_set\", \"pl_timestamp\", \"pl_oem\",\n", + " \"pl_off_pkt\", \"pl_OL\", \"pl_P\", \"pl_pf\", \"pl_ph_ang\", \"pl_proto\", \"pl_publish\",\n", + " \"pl_room_humid\", \"pl_room_temp\", \"pl_sens_pos\", \"pl_single_flag\", \"pl_T\",\n", + " \"pl_Tempc\", \"pl_Time\", \"pl_tot_run_hrs\", \"pl_tz\", \"pl_meter\", \"pl_voltage\",\n", + " \"pl_current\", \"pl_power\", \"pl_freq\", \"pl_kwh\", \"pl_kwh_5min\",\n", + " \"pl_run_hrs_5min\", \"pl_ac_p\", \"pl_ac_mo\", \"pl_ac_fs\", \"pl_ac_temp\",\n", + " \"pl_ac_hs\", \"pl_ac_vs\", \"pl_rssi\"\n", + " ]\n", + "\n", + " # Function to process and append chunks to CSV\n", + " def process_and_append_to_csv(skip, first_chunk):\n", + " cursor = collection.find(\n", + " {\n", + " \"t\": \"p\",\n", + " \"createdAt\": {\"$gte\": start_date},\n", + " },\n", + " {\"_id\": 1, \"__v\": 1, \"m\": 1, \"t\": 1, \"pl\": 1, \"dId\": 1, \"cId\": 1, \"pVal\": 1,\n", + " \"ports\": 1, \"updatedAt\": 1, \"createdAt\": 1, \"documentUpdatedAt\": 1}\n", + " ).skip(skip).limit(batch_size)\n", + "\n", + " data = list(cursor)\n", + " if not data:\n", + " return False\n", + "\n", + " df = pd.json_normalize(data, sep=\"_\")\n", + " print(df[\"updatedAt\"].min(), df[\"updatedAt\"].max())\n", + "\n", + " if \"ports\" in df.columns:\n", + " df[\"ports\"] = df[\"ports\"].apply(lambda x: str(x[0]) if isinstance(x, list) and x else None)\n", + "\n", + " df.loc[:, ~df.columns.isin([\"updatedAt\", \"createdAt\", \"documentUpdatedAt\"])] = \\\n", + " df.loc[:, ~df.columns.isin([\"updatedAt\", \"createdAt\", \"documentUpdatedAt\"])].astype(str)\n", + "\n", + " for col in columns:\n", + " if col not in df.columns:\n", + " df[col] = None\n", + "\n", + " df[\"pl_power\"] = df['pl_power'].replace('nan', None)\n", + " df[\"pl_Humidity\"] = df['pl_Humidity'].replace('nan', None)\n", + " df[\"pl_Tempc\"] = df['pl_Tempc'].replace('nan', None)\n", + " df[\"pl_room_temp\"] = df['pl_room_temp'].replace('nan', None)\n", + " df[\"pl_kwh_5min\"] = df['pl_kwh_5min'].replace('nan', None)\n", + "\n", + " df = df[columns]\n", + " df.to_csv(output_file, mode='a', header=first_chunk, index=False)\n", + " return True\n", + "\n", + " # Processing the data in chunks\n", + " skip = 0\n", + " first_chunk = True\n", + " chunk_number = 0\n", + "\n", + " while True:\n", + " if not process_and_append_to_csv(skip, first_chunk):\n", + " break\n", + "\n", + " chunk_number += 1\n", + " print(f\"Chunk {chunk_number} successfully appended.\")\n", + " skip += batch_size\n", + " first_chunk = False\n", + "\n", + " print(f\"Data has been written to {output_file}. Total chunks processed: {chunk_number}\")\n", + "\n", + " # Convert CSV to Parquet\n", + " df = pl.scan_csv(\"dls.csv\", infer_schema=False)\n", + " cols = df.collect_schema().names()\n", + " print(f\"Columns count: {len(cols)}, Unique columns (case-insensitive): {len(set(x.lower() for x in cols))}\")\n", + "\n", + " df_final = df.with_columns(\n", + " pl.col(\"createdAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"createdAt\"),\n", + " pl.col(\"updatedAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"updatedAt\"),\n", + " pl.col(\"documentUpdatedAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"documentUpdatedAt\"),\n", + " ).collect().write_parquet(\n", + " \"dls_updated.parquet\",\n", + " compression=\"snappy\",\n", + " use_pyarrow=True,\n", + " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", + " )\n", + "\n", + " # Filter out null createdAt records\n", + " df2 = pl.scan_parquet(\"dls_updated.parquet\")\n", + " df2.filter(pl.col(\"createdAt\").is_not_null()).collect().write_parquet(\n", + " \"dls_updated_final2.parquet\",\n", + " compression=\"snappy\",\n", + " use_pyarrow=True,\n", + " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", + " )\n", + "\n", + " # Verify data\n", + " dls_df = pl.scan_parquet(\"dls_updated_final2.parquet\")\n", + " min_date, max_date = dls_df.select(pl.min(\"updatedAt\")).collect(), dls_df.select(pl.max(\"updatedAt\")).collect()\n", + " print(f\"Data range - Min: {min_date}, Max: {max_date}\")\n", + " print(\"dls is ready\")\n", + "\n", + " # Final cleanup and upload preparation\n", + " df = pd.read_parquet(\"dls_updated_final2.parquet\")\n", + " df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']] = \\\n", + " df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']].replace('nan', None)\n", + " df = df.dropna(axis=1, how='all')\n", + " df.to_parquet(\"dls_updated_final2.parquet\", use_deprecated_int96_timestamps=True)\n", + "\n", + " print(\"Data migration completed successfully!\")\n", + "\n", + " except Exception as e:\n", + " print(f\"Error during data migration: {str(e)}\")\n", + " raise\n", + " finally:\n", + " # Clean up resources\n", + " if 'client' in locals():\n", + " client.close()\n", + " if 'conn_redshift' in locals():\n", + " conn_redshift.close()\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " main()\n", + "\n", + "\n", + "\n", + "# ==== AWS + S3 Config ====\n", + "aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'\n", + "aws_secret_access_key = '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'\n", + "region_name = 'ap-south-1'\n", + "bucket_name = 'ankur-v'\n", + "s3_output_key = 'dls.parquet'\n", + "local_file_path = 'dls_updated_final2.parquet'\n", + "s3_path = f\"s3://{bucket_name}/{s3_output_key}\"\n", + "\n", + "# ==== Redshift Config ====\n", + "redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'\n", + "redshift_db = 'dev'\n", + "redshift_user = 'awsuser'\n", + "redshift_password = 'LtRedshift-2024'\n", + "redshift_port = 5439\n", + "\n", + "# ==== SQLAlchemy Config ====\n", + "engine = create_engine(\n", + " f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'\n", + ")\n", + "Session = sessionmaker(bind=engine)\n", + "\n", + "# ==== Step 1: Upload to S3 ====\n", + "def upload_to_s3():\n", + " print(\"🔼 Uploading file to S3...\")\n", + " session = boto3.Session(\n", + " aws_access_key_id=aws_access_key_id,\n", + " aws_secret_access_key=aws_secret_access_key,\n", + " region_name=region_name\n", + " )\n", + " s3 = session.resource('s3')\n", + " s3.meta.client.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_output_key)\n", + " print(\"✅ File uploaded to S3.\")\n", + "\n", + "# ==== Step 2: Copy to Redshift (dls_today) ====\n", + "def copy_to_redshift():\n", + " print(\"⬇️ Copying data into Redshift temporary table...\")\n", + " conn = psycopg2.connect(\n", + " host=redshift_host,\n", + " port=redshift_port,\n", + " database=redshift_db,\n", + " user=redshift_user,\n", + " password=redshift_password\n", + " )\n", + " cursor = conn.cursor()\n", + " cursor.execute(\"truncate table lightson.dls_today;\")\n", + " copy_query = f\"\"\"\n", + " COPY lightson.dls_today\n", + " FROM '{s3_path}'\n", + " CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'\n", + " FORMAT AS PARQUET;\n", + " \"\"\"\n", + " cursor.execute(copy_query)\n", + " cursor.close()\n", + " conn.close()\n", + " print(\"✅ Copied data to dls_today.\")\n", + "\n", + "# ==== Step 3: Merge data (delete + insert) ====\n", + "def merge_into_dls():\n", + " print(\"🔄 Merging dls_today into dls...\")\n", + " try:\n", + " with Session() as session:\n", + " # Delete overlapping records\n", + " delete_result = session.execute(text(\"\"\"\n", + " DELETE FROM lightson.dls\n", + " WHERE _id IN (\n", + " SELECT _id FROM lightson.dls\n", + " INTERSECT\n", + " SELECT _id FROM lightson.dls_today\n", + " )\n", + " \"\"\"))\n", + " session.commit()\n", + " print(f\"🗑️ Rows deleted from dls: {delete_result.rowcount}\")\n", + "\n", + " with Session() as session:\n", + " # Insert new records\n", + " insert_result = session.execute(text(\"\"\"\n", + " INSERT INTO lightson.dls (\n", + " _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, pl_f, pl_https_exp,\n", + " pl_humidity, pl_lmf, pl_mo, pl_mqtts_exp, pl_multiple_flag, pl_ns,\n", + " pl_online, did, cid, ports_0, updatedat, createdat, documentupdatedat,\n", + " pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, pl_ol, pl_p, pl_pf,\n", + " pl_ph_ang, pl_proto, pl_publish, pl_room_humid, pl_room_temp,\n", + " pl_sens_pos, pl_single_flag, pl_t, pl_tempc, pl_time, pl_tot_run_hrs,\n", + " pl_tz, pl_meter, pl_voltage, pl_current, pl_power, pl_freq, pl_kwh,\n", + " pl_kwh_5min, pl_run_hrs_5min, pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp,\n", + " pl_ac_hs, pl_ac_vs, pl_rssi\n", + " )\n", + " SELECT\n", + " _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds,\n", + " NULL AS pl_f, NULL AS pl_https_exp, pl_humidity,\n", + " NULL AS pl_lmf, NULL AS pl_mo, NULL AS pl_mqtts_exp,\n", + " NULL AS pl_multiple_flag, pl_ns, pl_online, did, cid,\n", + " ports AS ports_0, updatedat, createdat, documentupdatedat,\n", + " pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt,\n", + " NULL AS pl_ol, NULL AS pl_p, pl_pf, pl_ph_ang,\n", + " pl_proto, NULL AS pl_publish, pl_room_humid,\n", + " pl_room_temp, pl_sens_pos, NULL AS pl_single_flag,\n", + " NULL AS pl_t, pl_tempc, pl_time, pl_tot_run_hrs,\n", + " NULL AS pl_tz, pl_meter, pl_voltage, pl_current,\n", + " pl_power, pl_freq, pl_kwh, pl_kwh_5min, pl_run_hrs_5min,\n", + " pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, pl_ac_hs,\n", + " pl_ac_vs, pl_rssi\n", + " FROM lightson.dls_today\n", + " \"\"\"))\n", + " session.commit()\n", + " print(f\"✅ Rows inserted into dls: {insert_result.rowcount}\")\n", + " except Exception as e:\n", + " print(\"❌ Merge failed:\", e)\n", + " traceback.print_exc()\n", + "\n", + "# ==== Main Runner ====\n", + "if __name__ == \"__main__\":\n", + " try:\n", + " upload_to_s3()\n", + " copy_to_redshift()\n", + " merge_into_dls()\n", + " print(\"🎉 Data pipeline complete.\")\n", + " except Exception as e:\n", + " print(\"🔥 Fatal error:\", e)\n", + " traceback.print_exc()\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "#---------------------------------------------------------meter raw ------------------------------------------------------------\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "import pandas as pd\n", + "import boto3\n", + "import awswrangler as wr\n", + "from datetime import datetime, timedelta\n", + "from pymongo import MongoClient\n", + "from pathlib import Path\n", + "import os\n", + "import polars as pl\n", + "from pytz import timezone\n", + "from math import ceil\n", + "import time\n", + "import psycopg2\n", + "from sqlalchemy import create_engine, text\n", + "from sqlalchemy.orm import sessionmaker\n", + "import traceback\n", + "\n", + "# Constants\n", + "IST = timezone('Asia/Kolkata')\n", + "UTC = timezone('UTC')\n", + "CHUNK_SIZE = 100000\n", + "\n", + "def meter_raw_migration():\n", + " try:\n", + " print(\"Starting meter_raw_migration process...\")\n", + "\n", + " # ==== Configuration ====\n", + " # Define start_date (yesterday) and end_date (today) in UTC\n", + " start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)\n", + " end_date = datetime.utcnow()\n", + "\n", + " # AWS and Redshift configuration\n", + " aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'\n", + " aws_secret_access_key = 'LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh'\n", + " region_name = 'ap-south-1'\n", + " bucket_name = 'ankur-v'\n", + " s3_output_key = 'meterraw.parquet'\n", + " redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'\n", + " redshift_db = 'dev'\n", + " redshift_user = 'awsuser'\n", + " redshift_password = 'LtRedshift-2024'\n", + " redshift_port = 5439\n", + "\n", + " # ==== Data Processing ====\n", + " # Create output directory\n", + " root_path = Path.cwd() / \"meterRaw\" / end_date.strftime(\"%Y_%m_%d\")\n", + " root_path.mkdir(parents=True, exist_ok=True)\n", + "\n", + " # Define schema and columns\n", + " cols = [\n", + " '_id', 'm', 'DS', 'MS', 'Power(W)', 'ac_power', 'angle', 'current_kwh',\n", + " 'fan_speed', 'freq', 'mode', 'serialNo', 'temp', 'updatedAt', 'room_temp',\n", + " 'room_humid', 'ac_hs', 'ac_vs', 'kwh_5min', 'v', 'timestamp', 'oem',\n", + " 'Algo_flag', 'createdAt', 'current_phase_1', 'current_phase_2',\n", + " 'current_phase_3', 'voltage_phase_1', 'voltage_phase_2', 'voltage_phase_3',\n", + " 'power_factor_phase_1', 'power_factor_phase_2', 'power_factor_phase_3'\n", + " ]\n", + "\n", + " schema = {\n", + " \"_id\": \"string\",\n", + " \"updatedAt\": \"datetime64[ns]\",\n", + " \"createdAt\": \"datetime64[ns]\",\n", + " \"m\": \"string\",\n", + " \"current_phase_1\": \"Float64\",\n", + " \"current_phase_2\": \"Float64\",\n", + " \"current_phase_3\": \"Float64\",\n", + " \"DS\": \"boolean\",\n", + " \"MS\": \"boolean\",\n", + " \"Power(W)\": \"Float64\",\n", + " \"voltage_phase_1\": \"Float64\",\n", + " \"voltage_phase_2\": \"Float64\",\n", + " \"voltage_phase_3\": \"Float64\",\n", + " \"ac_power\": \"Int64\",\n", + " \"angle\": \"Float64\",\n", + " \"current_kwh\": \"Float64\",\n", + " \"fan_speed\": \"Int64\",\n", + " \"freq\": \"Float64\",\n", + " \"mode\": \"Int64\",\n", + " \"power_factor_phase_1\": \"Float64\",\n", + " \"power_factor_phase_2\": \"Float64\",\n", + " \"power_factor_phase_3\": \"Float64\",\n", + " \"serialNo\": \"string\",\n", + " \"temp\": \"Float64\",\n", + " \"room_temp\": \"Float64\",\n", + " \"room_humid\": \"Float64\",\n", + " \"ac_hs\": \"Int64\",\n", + " \"ac_vs\": \"Int64\",\n", + " \"kwh_5min\": \"Float64\",\n", + " \"v\": \"string\",\n", + " \"timestamp\": \"string\",\n", + " \"oem\": \"string\",\n", + " \"Algo_flag\": \"string\"\n", + " }\n", + "\n", + " # MongoDB connection\n", + " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0\")\n", + " db = client.get_database(\"lightson\")\n", + " coll = db.get_collection(\"meterRaw\")\n", + "\n", + " # Query definitions\n", + " query = {\n", + " \"single\": {\n", + " \"Current.0\": {\"$exists\": False}\n", + " },\n", + " \"three\": {\n", + " \"Current.0\": {\"$exists\": True}\n", + " }\n", + " }\n", + "\n", + " # Helper function for chunk processing\n", + " def get_chunk(rows: int, completed_count: int = 0, chunk_size=CHUNK_SIZE):\n", + " chunks = ceil((rows - completed_count) / chunk_size)\n", + " for i in range(chunks):\n", + " skip = completed_count + (i * chunk_size)\n", + " limit = min(rows - skip, chunk_size)\n", + " yield skip, limit\n", + "\n", + " # ==== Process Single Phase Data ====\n", + " print(\"Processing single phase data...\")\n", + " query_single_phase = query[\"single\"].copy()\n", + " query_single_phase.update({\n", + " \"updatedAt\": {\n", + " \"$gte\": start_date,\n", + " \"$lte\": end_date\n", + " }\n", + " })\n", + "\n", + " count = coll.count_documents(query_single_phase)\n", + " idx = 0\n", + "\n", + " for skip, limit in get_chunk(count):\n", + " df = pd.DataFrame(list(coll.find(query_single_phase, skip=skip, limit=limit)))\n", + " print(f\"Processing single phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}\")\n", + "\n", + " df[\"_id\"] = df[\"_id\"].astype(str)\n", + " df = df.rename(columns={\n", + " \"Current\": \"current_phase_1\",\n", + " \"Voltage\": \"voltage_phase_1\",\n", + " \"power_factor\": \"power_factor_phase_1\"\n", + " }).reindex(columns=cols).astype(schema)\n", + "\n", + " df.to_parquet(root_path / f\"single_phase_idx_{idx}.parquet\", use_deprecated_int96_timestamps=True)\n", + " idx += 1\n", + "\n", + " # ==== Process Three Phase Data ====\n", + " print(\"Processing three phase data...\")\n", + " query_three_phase = query[\"three\"].copy()\n", + " query_three_phase.update({\n", + " \"updatedAt\": {\n", + " \"$gte\": start_date,\n", + " \"$lte\": end_date\n", + " }\n", + " })\n", + "\n", + " count = coll.count_documents(query_three_phase)\n", + " idx = 0\n", + "\n", + " for skip, limit in get_chunk(count):\n", + " df = pd.DataFrame(list(coll.find(query_three_phase, skip=skip, limit=limit)))\n", + " print(f\"Processing three phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}\")\n", + "\n", + " # Expand array columns\n", + " for key in (\"Current\", \"Voltage\", \"power_factor\"):\n", + " array_col = df.pop(key)\n", + " df[[f\"{key.lower()}_phase_1\", f\"{key.lower()}_phase_2\", f\"{key.lower()}_phase_3\"]] = array_col.to_list()\n", + "\n", + " df[\"_id\"] = df[\"_id\"].astype(str)\n", + " df = df.reindex(columns=cols).astype(schema)\n", + " df.to_parquet(root_path / f\"three_phase_idx_{idx}.parquet\", use_deprecated_int96_timestamps=True)\n", + " idx += 1\n", + "\n", + " # ==== Combine and Finalize Data ====\n", + " print(\"Combining all data files...\")\n", + " df = pl.scan_parquet(root_path / \"*.parquet\")\n", + " final_filename = f\"meterRaw_{end_date.strftime('%Y_%m_%d')}.parquet\"\n", + " df.collect().write_parquet(\n", + " Path.cwd() / final_filename,\n", + " compression=\"snappy\",\n", + " use_pyarrow=True,\n", + " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", + " )\n", + "\n", + " # Clean up temporary files\n", + " for children in root_path.glob(\"*.parquet\"):\n", + " children.unlink(missing_ok=True)\n", + " root_path.rmdir()\n", + "\n", + " # Verify final file\n", + " df2 = pl.scan_parquet(Path.cwd() / final_filename)\n", + " date_range = df2.select([\n", + " pl.col(\"createdAt\").min().alias(\"min\"),\n", + " pl.col(\"createdAt\").max().alias(\"max\")\n", + " ]).collect()\n", + " print(f\"Data range in final file: {date_range}\")\n", + " print(\"meterraw_is_generated\")\n", + "\n", + " # ==== Upload to S3 ====\n", + " print(\"🔼 Uploading MeterRaw to S3...\")\n", + " session = boto3.Session(\n", + " aws_access_key_id=aws_access_key_id,\n", + " aws_secret_access_key=aws_secret_access_key,\n", + " region_name=region_name\n", + " )\n", + " s3 = session.resource('s3')\n", + " s3.meta.client.upload_file(\n", + " Filename=final_filename,\n", + " Bucket=bucket_name,\n", + " Key=s3_output_key\n", + " )\n", + " print(\"✅ File uploaded to S3.\")\n", + "\n", + " # ==== Redshift Operations ====\n", + " # Initialize SQLAlchemy engine\n", + " engine = create_engine(\n", + " f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'\n", + " )\n", + " Session = sessionmaker(bind=engine)\n", + "\n", + " # ==== Copy to Redshift (meter_today) ====\n", + " print(\"⬇️ Copying data into Redshift meter_today...\")\n", + " s3_path = f\"s3://{bucket_name}/{s3_output_key}\"\n", + "\n", + "\n", + "\n", + " conn = psycopg2.connect(\n", + " host=redshift_host,\n", + " port=redshift_port,\n", + " database=redshift_db,\n", + " user=redshift_user,\n", + " password=redshift_password)\n", + "\n", + "\n", + "\n", + "\n", + " cursor = conn.cursor()\n", + " cursor.execute(\"TRUNCATE TABLE lightson.meter_today;\")\n", + " cursor.execute(f\"\"\"COPY lightson.meter_today FROM '{s3_path}' CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' FORMAT AS PARQUET;\"\"\")\n", + " conn.commit()\n", + " print(\"✅ Copied data to meter_today.\")\n", + "\n", + " # ==== Merge into meterraw ====\n", + " print(\"🔄 Merging meter_today into meterraw...\")\n", + " try:\n", + " with Session() as session:\n", + " # Delete duplicates\n", + " delete_result = session.execute(text(\"\"\"\n", + " DELETE FROM lightson.meterraw\n", + " WHERE _id IN (\n", + " SELECT _id FROM lightson.meterraw\n", + " INTERSECT\n", + " SELECT _id FROM lightson.meter_today\n", + " )\n", + " \"\"\"))\n", + " session.commit()\n", + " print(f\"🗑️ Deleted rows from meterraw: {delete_result.rowcount}\")\n", + "\n", + " # Insert new records\n", + " insert_result = session.execute(text(\"\"\"\n", + " INSERT INTO lightson.meterraw (\n", + " _id, m, ds, ms, power_w_, ac_power, angle, current_kwh, fan_speed,\n", + " freq, mode, serialno, temp, updatedat, createdat, current_phase_1,\n", + " current_phase_2, current_phase_3, voltage_phase_1, voltage_phase_2,\n", + " voltage_phase_3, power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,\n", + " room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, \"timestamp\", oem, algo_flag\n", + " )\n", + " SELECT\n", + " _id, m,\n", + " CAST(ds AS BOOLEAN),\n", + " CAST(ms AS BOOLEAN),\n", + " \"power(w)\" AS power_w_,\n", + " ac_power, angle, current_kwh, fan_speed, freq, mode, serialno, temp,\n", + " updatedat, createdat, current_phase_1, current_phase_2, current_phase_3,\n", + " voltage_phase_1, voltage_phase_2, voltage_phase_3,\n", + " power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,\n", + " room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, \"timestamp\", oem, algo_flag\n", + " FROM lightson.meter_today\n", + " \"\"\"))\n", + " session.commit()\n", + " print(f\"✅ Inserted rows into meterraw: {insert_result.rowcount}\")\n", + "\n", + " except Exception as e:\n", + " print(\"❌ Merge failed:\", e)\n", + " traceback.print_exc()\n", + " raise\n", + "\n", + " print(\"🎉 MeterRaw pipeline complete.\")\n", + "\n", + " except Exception as e:\n", + " print(\"🔥 Fatal error in pipeline:\", e)\n", + " traceback.print_exc()\n", + " raise\n", + " finally:\n", + " # Clean up resources\n", + " if 'client' in locals():\n", + " client.close()\n", + " if 'engine' in locals():\n", + " engine.dispose()\n", + " print(\"Resources cleaned up.\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " meter_raw_migration()\n" + ] + }, + { + "cell_type": "code", + "source": [], + "metadata": { + "id": "0qJ5OXpnBdzk" + }, + "execution_count": null, + "outputs": [] + } + ] +} \ No newline at end of file