diff --git a/data_migration_end_to_end.ipynb b/data_migration_end_to_end.ipynb index 5d183e6..f9f8075 100644 --- a/data_migration_end_to_end.ipynb +++ b/data_migration_end_to_end.ipynb @@ -1,999 +1,961 @@ -{ - "nbformat": 4, - "nbformat_minor": 0, - "metadata": { - "colab": { - "private_outputs": true, - "provenance": [] - }, - "kernelspec": { - "name": "python3", - "display_name": "Python 3" - }, - "language_info": { - "name": "python" - } - }, - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "7kZDwuM2BdFf" - }, - "outputs": [], - "source": [ - "import time\n", - "import logging\n", - "import traceback\n", - "from math import ceil\n", - "from os import getenv\n", - "from datetime import datetime, timedelta\n", - "from pathlib import Path\n", - "\n", - "import pandas as pd\n", - "import polars as pl\n", - "import pytz\n", - "from zoneinfo import ZoneInfo\n", - "\n", - "import boto3\n", - "import awswrangler as wr\n", - "import redshift_connector\n", - "import psycopg2\n", - "\n", - "from pymongo import MongoClient, errors\n", - "from pymongo.database import Database\n", - "from pymongo.collection import Collection\n", - "\n", - "from sqlalchemy import create_engine, text\n", - "from sqlalchemy.orm import sessionmaker\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "IST = pytz.timezone('Asia/Kolkata')\n", - "now = datetime.now(IST)\n", - "print(now.strftime('%Y-%m-%d %H:%M:%S'))\n", - "\n", - "\n", - "\n", - "UTC = pytz.UTC # or: pytz.timezone('UTC')\n", - "now = datetime.now(UTC)\n", - "print(now)\n", - "\n", - "\n", - "\n", - "\n", - "#----------------------port forwarding ---------------------\n", - "\n", - "\n", - "import subprocess\n", - "import socket\n", - "import time\n", - "\n", - "def is_port_in_use(port):\n", - " with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:\n", - " return s.connect_ex(('localhost', port)) == 0\n", - "\n", - "def start_port_forward():\n", - " try:\n", - " print(\"Starting port-forward...\")\n", - " # You can add full path to ./kubectl if needed\n", - " subprocess.Popen(\n", - " ['./kubectl', 'port-forward', '--namespace', 'mongo', 'mongo-db-dr-0', '9001:27017'],\n", - " stdout=subprocess.DEVNULL,\n", - " stderr=subprocess.DEVNULL\n", - " )\n", - " print(\"Port-forward command issued.\")\n", - " except Exception as e:\n", - " print(f\"Error starting port-forward: {e}\")\n", - "\n", - "def ensure_port_forward():\n", - " if is_port_in_use(9001):\n", - " print(\"Port 9001 is already in use — no need to start port-forward.\")\n", - " else:\n", - " print(\"Port 9001 is NOT in use — starting port-forward.\")\n", - " start_port_forward()\n", - " time.sleep(5) # Wait a few seconds to let it establish\n", - " if is_port_in_use(9001):\n", - " print(\"Port-forward established successfully\")\n", - " else:\n", - " print(\"Failed to establish port-forward \")\n", - "\n", - "if __name__ == \"__main__\":\n", - " ensure_port_forward()\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "#-----------------------------data migration start-----------------------\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "# Configure logging to print instead of writing to a file\n", - "logging.basicConfig(level=logging.INFO, format=\"%(asctime)s - %(levelname)s - %(message)s\")\n", - "def data_migration(client):\n", - " \"\"\"Fetch, process, and store data from MongoDB.\"\"\"\n", - " try:\n", - " db = client[\"lightson\"]\n", - " logging.info(\"Connected to MongoDB successfully.\")\n", - " print(\" Connection successfully established.\")\n", - "\n", - "\n", - "\n", - "\n", - " # New_Energy_Saving Data\n", - " collection = db['new_energySaving']\n", - " end_date = datetime.today() - timedelta(days=1)\n", - " # Ensure all required columns exist\n", - " required_columns = [\"_id\", \"serialNo\", \"devices\", \"mitigated_co2\",\"et\" , \"total_kwh\", \"deviceMapID\", \"weather_min_temp\", \"weather_max_temp\", \"billing_amount\",\n", - " \"updatedAt\", \"ac_fan_hrs\", \"ac_run_hrs\", \"algo_status\", \"clientID\", \"cost_reduction\", \"createdAt\", \"energy_savings_ref_kwh\",\n", - " \"energy_savings_us_meter\", \"energy_savings_inv_factor\", \"energy_savings_us_calc\", \"energy_savings_savings_percent\"]\n", - "\n", - " cursor = collection.find({\"createdAt\": {\"$lt\": end_date},},\n", - " {\"_id\": 1, \"serialNo\": 1, \"devices\": 1, \"mitigated_co2\": 1, \"total_kwh\": 1, \"deviceMapID\": 1, \"weather\": 1,\n", - " \"billing_amount\": 1, \"updatedAt\": 1, \"ac_fan_hrs\": 1, \"ac_run_hrs\": 1, \"algo_status\": 1,\n", - " \"clientID\": 1, \"cost_reduction\": 1, \"createdAt\": 1, \"energy_savings\": 1})\n", - " # Convert cursor to a list\n", - " data = list(cursor)\n", - " if not data:\n", - " print(\"No data found in the specified date range.\")\n", - " else:\n", - " df_nes = pd.json_normalize(data, sep='_')\n", - " df_nes = df_nes.explode(\"devices\")\n", - " # Convert ObjectId fields to strings\n", - "\n", - " for col in ['_id', 'clientID', 'deviceMapID', 'devices', \"serialNo\"]:\n", - " if col in df_nes.columns:\n", - " df_nes[col] = df_nes[col].astype(str)\n", - "\n", - " # Convert numerical fields properly\n", - "\n", - " for col in [\"mitigated_co2\", \"energy_savings_ref_kwh\", \"energy_savings_savings_percent\"]:\n", - " if col in df_nes.columns:\n", - " df_nes[col] = pd.to_numeric(df_nes[col], errors=\"coerce\")\n", - "\n", - "\n", - " for col in required_columns:\n", - " if col not in df_nes.columns:\n", - " df_nes[col] = None\n", - "\n", - " df_nes = df_nes[required_columns]\n", - "\n", - " df_nes.drop_duplicates(subset='_id')\n", - " df_nes[\"et\"] = df_nes[\"et\"].fillna(\"\")\n", - " df_nes.rename(columns={'weather_min_temp': 'min_temp','weather_max_temp': 'max_temp','energy_savings_ref_kwh': 'ref_kwh',\n", - " 'energy_savings_us_meter': 'us_meter','energy_savings_inv_factor': 'inv_factor',\n", - " 'energy_savings_us_calc': 'us_calc','energy_savings_savings_percent': 'savings_percent'}, inplace=True)\n", - " # df_nes[\"et\"] = df_nes[\"et\"].fillna(\"0\").astype(str)\n", - " df_nes.to_parquet(\"New_Energy_Saving.parquet\", use_deprecated_int96_timestamps=True, index=False, engine='pyarrow')\n", - " df_nes = pd.read_parquet(\"New_Energy_Saving.parquet\")\n", - " print(df_nes.head(3))\n", - " print(\"New_Energy_Saving.parquet has been successfully saved.\")\n", - "\n", - " # Devices Data\n", - " df_devices = pd.DataFrame(db.devices.find())\n", - " if df_devices.empty:\n", - " logging.warning(\"No data found in 'devices' collection.\")\n", - " else:\n", - " df_devices[\"_id\"] = df_devices[\"_id\"].astype(str)\n", - " df_devices[\"assignedTo\"] = df_devices[\"assignedTo\"].astype(str)\n", - " if \"ports\" in df_devices.columns:\n", - " df_devices[\"ports\"] = df_devices[\"ports\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", - " df_devices = df_devices.explode(\"ports\")\n", - " for column in df_devices.columns:\n", - " if column not in [\"_id\",\"__v\",\"algorithm_type\",\"assignedTo\",\"AV1_ENABLE\",\"AV1_PENDING_STATUS\",\"AV2_ENABLE\",\"AV2_PENDING_STATUS\",\"configured\",\n", - " \"createdAt\",\"deviceId\",\"dType\",\"IS_MAINTENANCE\",\"IS_PREDICTIVE_MAINTENANCE\",\"last_change\",\"lastConfiguredAt\",\"lastFactoryResetAt\",\"live_algo_status\",\n", - " \"lp\",\"macId\",\"modelnumber\",\"ns\",\"online\",\"OTAUpdatedAt\",\"ports\",\"proto_updatedAt\",\"protocolname\",\"status\",\"temp\",\"updatedAt\",\"version\",]:\n", - " df_devices.drop(columns=column, inplace=True)\n", - " df_devices.to_parquet(\"devices.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", - " df_devices = pd.read_parquet(\"devices.parquet\")\n", - " df_devices = df_devices.drop_duplicates(subset=[\"_id\"])\n", - " print(\"Devices Data:\\n\", df_devices.head(1)) # Debugging Output\n", - "\n", - " # Ports Data\n", - " df_ports = pd.DataFrame(db.ports.find())\n", - " if df_ports.empty:\n", - " logging.warning(\"No data found in 'ports' collection.\")\n", - " else:\n", - " df_ports = df_ports.astype({\"_id\": \"string\", \"device\": \"string\", \"groupId\": \"string\"}, errors=\"ignore\")\n", - " df_ports = df_ports.astype(\"string\", errors=\"ignore\")\n", - " df_ports.to_parquet(\"ports.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", - " df_ports = pd.read_parquet(\"ports.parquet\")\n", - "\n", - " print(\"Ports Data:\\n\", df_ports.head(1))\n", - "\n", - " # Clients Data\n", - " df_client = pd.DataFrame(db.clients.find())\n", - " if df_client.empty:\n", - " logging.warning(\"No data found in 'clients' collection.\")\n", - " else:\n", - " df_client[\"_id\"] = df_client[\"_id\"].astype(str)\n", - " for col in [\"users\", \"feature_configuration\"]:\n", - " if col in df_client.columns:\n", - " df_client.drop(col, axis=1, inplace=True)\n", - " if \"devices\" in df_client.columns:\n", - " df_client[\"devices\"] = df_client[\"devices\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", - " df_client = df_client.explode(\"devices\")\n", - " for column in df_client.columns:\n", - " if column not in [\"_id\",\"devices\",\"n\",\"e\",\"m\",\"addr\",\"pin\",\"city\",\"state\",\n", - " \"country\",\"createdAt\",\"updatedAt\",\"__v\",\"ct\"]:\n", - " df_client.drop(columns=column, inplace=True)\n", - " df_client.to_parquet(\"clients.parquet\" , use_deprecated_int96_timestamps = True, index = False)\n", - " df_client = pd.read_parquet(\"clients.parquet\")\n", - " print(\"Clients Data:\\n\", df_client.head(1)) # Debugging Output\n", - "\n", - " # Device Map Data\n", - " df_deviceMap = pd.DataFrame(client.lightson.deviceMap.find())\n", - " if df_deviceMap.empty:\n", - " logging.warning(\"No data found in 'deviceMap' collection.\")\n", - " else:\n", - " df_deviceMap[\"_id\"] = df_deviceMap[\"_id\"].astype(str)\n", - " df_deviceMap[\"ac_age\"] = df_deviceMap[\"ac_age\"].astype(str)\n", - " if \"clientID\" in df_deviceMap.columns:\n", - " df_deviceMap[\"clientID\"] = df_deviceMap[\"clientID\"].astype(str)\n", - " if \"devices\" in df_deviceMap.columns:\n", - " df_deviceMap[\"devices\"] = df_deviceMap[\"devices\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", - " df_deviceMap = df_deviceMap.explode(\"devices\")\n", - " if \"facilityId\" in df_deviceMap.columns:\n", - " df_deviceMap[\"facilityId\"] = df_deviceMap[\"facilityId\"].astype(str)\n", - " df_devicemap2 = df_deviceMap.copy(deep = True)\n", - " df_deviceMap = pd.concat([df_devicemap2.drop(\"toggle_threshold\", axis = 1), pd.json_normalize(df_devicemap2[\"toggle_threshold\"], sep = \"_\").add_prefix(\"toggle_threshold_\")])\n", - " for column in df_deviceMap.columns:\n", - " if column not in [\"_id\",\"__v\",\"ac_age\",\"ac_brand\",\"ac_model_number\",\"ac_ton\",\"ac_type\",\"atm_id\",\"auto_params_calculation\",\"city\",\"client_type\",\"clientID\",\"cool_down\",\n", - " \"createdAt\",\"current_kwh\",\"current_threshold\",\"devices\",\"Efan\",\"fan_consumption\",\"fan_off_equal\",\"fan_threshold\",\"fm_name\",\"fromdate\",\"installation_date\",\n", - " \"latest_ES_Date\",\"Meter\",\"mode_threshold\",\"facilityId\",\"model\",\"NAC\",\"perUnitCost\",\"phase\",\"power_correction\",\"ref_kwh\",\"run_hrs\",\"sched_lock\",\n", - " \"serialNo\",\"SIM_number\",\"site\",\"site_category\",\"site_status\",\"temp_threshold\",\"todate\",\"total_real_energy\",\"updatedAt\",\"sched_lock\",\n", - " \"sched_executed_at\",\"sched_executed_pwr\",\"sched_imminent\",\"sched_last_mismatch_at\",\"sched_corrected_at\",\"star_rating\",\"toggle_threshold_on2off\",\n", - " \"toggle_threshold_off2on\",\"toggle_threshold_notfan2fan\",\"toggle_threshold_fan2notfan\"\n", - " ]:\n", - " df_deviceMap.drop(columns=column, inplace=True)\n", - " df_deviceMap.to_parquet(\"deviceMap.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", - " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\")\n", - " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\").drop_duplicates(subset=['_id'], keep='first').reset_index(drop=True)\n", - "\n", - " print(\"Device Map Data:\\n\", df_deviceMap.head(1))\n", - "\n", - "\n", - " # daily_dump\n", - "\n", - " df_dump = pd.read_csv(f\"https://docs.google.com/spreadsheets/d/{'16CfL8N28-2nqUxWkbRGQVKlGViW0SoGlbsX4cUIzxKY'}/export?format=csv\")\n", - " df_dump.columns = df_dump.columns.str.replace(' ', '_')\n", - " df_dump = df_dump.astype(\"string\", errors=\"ignore\")\n", - " df_dump[\"Date\"] = pd.to_datetime(df_dump[\"Date\"], format = \"%d/%m/%Y\", errors=\"coerce\")\n", - " df_dump.to_parquet(\"daily_dump.parquet\", index = False, use_deprecated_int96_timestamps=True)\n", - " df_dump = pd.read_parquet(\"daily_dump.parquet\")\n", - "\n", - "\n", - "\n", - "\n", - "\n", - " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\")\n", - " df_clients = pd.read_parquet(\"clients.parquet\")\n", - " df_ports = pd.read_parquet(\"ports.parquet\")\n", - " df_nes = pd.read_parquet(\"New_Energy_Saving.parquet\")\n", - " df_devices = pd.read_parquet(\"devices.parquet\")\n", - " df_dump = pd.read_parquet(\"daily_dump.parquet\")\n", - " # Correct dictionary (keys should be strings, not DataFrames)\n", - " dfs = {\"devicemap\": df_deviceMap,\"clients\": df_clients,\"ports\": df_ports,\"New_Energy_Saving\": df_nes,\"devices\": df_devices ,\"daily_dump\":df_dump}\n", - "\n", - " session = boto3.Session(\n", - " aws_access_key_id = \"AKIARXCGNNYDRHFBBB6I\",\n", - " aws_secret_access_key = \"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\",\n", - " region_name=\"ap-south-1\")\n", - "\n", - " s3_path = \"s3://daily-dump-bucket/daily_dump\"\n", - "\n", - "\n", - " conn_redshift = wr.redshift.connect(secret_id=\"sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb\", boto3_session=session)\n", - "\n", - " # Loop through DataFrames and print their sizes\n", - " dfs = {\"devicemap\": df_deviceMap,\"clients\": df_clients,\"ports\": df_ports,\"new_energy_saving\": df_nes,\"devices\": df_devices ,\"daily_dump\": df_dump}\n", - " for name, df in dfs.items():\n", - " wr.redshift.copy(df=df,\n", - " path= s3_path,\n", - " con=conn_redshift,\n", - " schema=\"lightson\",\n", - " table=name,\n", - " mode=\"overwrite\",\n", - " varchar_lengths_default=65535,\n", - " boto3_session=session)\n", - " time.sleep(5) # Delay to prevent rapid successive writes\n", - "\n", - "\n", - " logging.info(\"Data migration completed successfully.\")\n", - " return df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump\n", - "\n", - "\n", - " wr.redshift.copy(df=df_dump,\n", - " path= s3_path,\n", - " con=conn_redshift,\n", - " schema=\"lightson\",\n", - " table=daily_dump_historical,\n", - " mode=\"append\",\n", - " varchar_lengths_default=65535,\n", - " boto3_session=session)\n", - "\n", - " except errors.ServerSelectionTimeoutError:\n", - " logging.error(\" MongoDB connection failed! Is the server running?\")\n", - " return None\n", - " except Exception as e:\n", - " logging.error(f\" Unexpected error: {e}\")\n", - " return None\n", - "\n", - "\n", - "# Example usage\n", - "try:\n", - " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=5000\")\n", - " logging.info(\"MongoDB client created successfully.\")\n", - "\n", - " df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump = data_migration(client)\n", - "\n", - " # Final check if data was retrieved\n", - " if df_nes is not None:\n", - " print(\"\\n Migration completed successfully.\")\n", - " else:\n", - " print(\"\\n Migration failed. Check logs for details.\")\n", - "except Exception as e:\n", - " logging.error(f\" Fatal error in script: {e}\")\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "#---------------------------------------------dls migration ------------------------------\n", - "\n", - "\n", - "import pandas as pd\n", - "import boto3\n", - "import awswrangler as wr\n", - "from datetime import datetime, timedelta\n", - "from pymongo import MongoClient\n", - "from pathlib import Path\n", - "import os\n", - "import polars as pl\n", - "from pytz import timezone\n", - "\n", - "# Constants\n", - "UTC = timezone('UTC')\n", - "IST = timezone('Asia/Kolkata')\n", - "\n", - "def main():\n", - " try:\n", - " # Initialize connections and settings\n", - " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0\")\n", - " db = client['lightson']\n", - " collection = db['dls']\n", - "\n", - " s3_path = \"s3://daily-dump-bucket/ankur-v\"\n", - " session = boto3.Session(\n", - " aws_access_key_id=\"AKIARXCGNNYDRHFBBB6I\",\n", - " aws_secret_access_key=\"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\",\n", - " region_name=\"ap-south-1\"\n", - " )\n", - "\n", - " conn_redshift = wr.redshift.connect(secret_id=\"sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb\", boto3_session=session)\n", - "\n", - " # Clean up previous file if exists\n", - " Path(\"dls.csv\").unlink(missing_ok=True)\n", - "\n", - " # Batch size for processing\n", - " batch_size = 200000\n", - " output_file = \"dls.csv\"\n", - "\n", - " # Get yesterday's date in UTC (12:00 AM IST)\n", - " start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)\n", - "\n", - " # Define the expected columns (schema) for the CSV file\n", - " columns = [\n", - " \"_id\", \"__v\", \"m\", \"t\", \"pl_m\", \"pl_v\", \"pl_Algo_flag\", \"pl_pir\", \"pl_DS\", \"pl_F\",\n", - " \"pl_https_exp\", \"pl_Humidity\", \"pl_LMF\", \"pl_Mo\", \"pl_mqtts_exp\", \"pl_multiple_flag\",\n", - " \"pl_NS\", \"pl_online\", \"dId\", \"cId\", \"ports\", \"updatedAt\", \"createdAt\",\n", - " \"documentUpdatedAt\", \"pl_tz_set\", \"pl_timestamp\", \"pl_oem\",\n", - " \"pl_off_pkt\", \"pl_OL\", \"pl_P\", \"pl_pf\", \"pl_ph_ang\", \"pl_proto\", \"pl_publish\",\n", - " \"pl_room_humid\", \"pl_room_temp\", \"pl_sens_pos\", \"pl_single_flag\", \"pl_T\",\n", - " \"pl_Tempc\", \"pl_Time\", \"pl_tot_run_hrs\", \"pl_tz\", \"pl_meter\", \"pl_voltage\",\n", - " \"pl_current\", \"pl_power\", \"pl_freq\", \"pl_kwh\", \"pl_kwh_5min\",\n", - " \"pl_run_hrs_5min\", \"pl_ac_p\", \"pl_ac_mo\", \"pl_ac_fs\", \"pl_ac_temp\",\n", - " \"pl_ac_hs\", \"pl_ac_vs\", \"pl_rssi\"\n", - " ]\n", - "\n", - " # Function to process and append chunks to CSV\n", - " def process_and_append_to_csv(skip, first_chunk):\n", - " cursor = collection.find(\n", - " {\n", - " \"t\": \"p\",\n", - " \"createdAt\": {\"$gte\": start_date},\n", - " },\n", - " {\"_id\": 1, \"__v\": 1, \"m\": 1, \"t\": 1, \"pl\": 1, \"dId\": 1, \"cId\": 1, \"pVal\": 1,\n", - " \"ports\": 1, \"updatedAt\": 1, \"createdAt\": 1, \"documentUpdatedAt\": 1}\n", - " ).skip(skip).limit(batch_size)\n", - "\n", - " data = list(cursor)\n", - " if not data:\n", - " return False\n", - "\n", - " df = pd.json_normalize(data, sep=\"_\")\n", - " print(df[\"updatedAt\"].min(), df[\"updatedAt\"].max())\n", - "\n", - " if \"ports\" in df.columns:\n", - " df[\"ports\"] = df[\"ports\"].apply(lambda x: str(x[0]) if isinstance(x, list) and x else None)\n", - "\n", - " df.loc[:, ~df.columns.isin([\"updatedAt\", \"createdAt\", \"documentUpdatedAt\"])] = \\\n", - " df.loc[:, ~df.columns.isin([\"updatedAt\", \"createdAt\", \"documentUpdatedAt\"])].astype(str)\n", - "\n", - " for col in columns:\n", - " if col not in df.columns:\n", - " df[col] = None\n", - "\n", - " df[\"pl_power\"] = df['pl_power'].replace('nan', None)\n", - " df[\"pl_Humidity\"] = df['pl_Humidity'].replace('nan', None)\n", - " df[\"pl_Tempc\"] = df['pl_Tempc'].replace('nan', None)\n", - " df[\"pl_room_temp\"] = df['pl_room_temp'].replace('nan', None)\n", - " df[\"pl_kwh_5min\"] = df['pl_kwh_5min'].replace('nan', None)\n", - "\n", - " df = df[columns]\n", - " df.to_csv(output_file, mode='a', header=first_chunk, index=False)\n", - " return True\n", - "\n", - " # Processing the data in chunks\n", - " skip = 0\n", - " first_chunk = True\n", - " chunk_number = 0\n", - "\n", - " while True:\n", - " if not process_and_append_to_csv(skip, first_chunk):\n", - " break\n", - "\n", - " chunk_number += 1\n", - " print(f\"Chunk {chunk_number} successfully appended.\")\n", - " skip += batch_size\n", - " first_chunk = False\n", - "\n", - " print(f\"Data has been written to {output_file}. Total chunks processed: {chunk_number}\")\n", - "\n", - " # Convert CSV to Parquet\n", - " df = pl.scan_csv(\"dls.csv\", infer_schema=False)\n", - " cols = df.collect_schema().names()\n", - " print(f\"Columns count: {len(cols)}, Unique columns (case-insensitive): {len(set(x.lower() for x in cols))}\")\n", - "\n", - " df_final = df.with_columns(\n", - " pl.col(\"createdAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"createdAt\"),\n", - " pl.col(\"updatedAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"updatedAt\"),\n", - " pl.col(\"documentUpdatedAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"documentUpdatedAt\"),\n", - " ).collect().write_parquet(\n", - " \"dls_updated.parquet\",\n", - " compression=\"snappy\",\n", - " use_pyarrow=True,\n", - " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", - " )\n", - "\n", - " # Filter out null createdAt records\n", - " df2 = pl.scan_parquet(\"dls_updated.parquet\")\n", - " df2.filter(pl.col(\"createdAt\").is_not_null()).collect().write_parquet(\n", - " \"dls_updated_final2.parquet\",\n", - " compression=\"snappy\",\n", - " use_pyarrow=True,\n", - " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", - " )\n", - "\n", - " # Verify data\n", - " dls_df = pl.scan_parquet(\"dls_updated_final2.parquet\")\n", - " min_date, max_date = dls_df.select(pl.min(\"updatedAt\")).collect(), dls_df.select(pl.max(\"updatedAt\")).collect()\n", - " print(f\"Data range - Min: {min_date}, Max: {max_date}\")\n", - " print(\"dls is ready\")\n", - "\n", - " # Final cleanup and upload preparation\n", - " df = pd.read_parquet(\"dls_updated_final2.parquet\")\n", - " df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']] = \\\n", - " df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']].replace('nan', None)\n", - " df = df.dropna(axis=1, how='all')\n", - " df.to_parquet(\"dls_updated_final2.parquet\", use_deprecated_int96_timestamps=True)\n", - "\n", - " print(\"Data migration completed successfully!\")\n", - "\n", - " except Exception as e:\n", - " print(f\"Error during data migration: {str(e)}\")\n", - " raise\n", - " finally:\n", - " # Clean up resources\n", - " if 'client' in locals():\n", - " client.close()\n", - " if 'conn_redshift' in locals():\n", - " conn_redshift.close()\n", - "\n", - "\n", - "if __name__ == \"__main__\":\n", - " main()\n", - "\n", - "\n", - "\n", - "# ==== AWS + S3 Config ====\n", - "aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'\n", - "aws_secret_access_key = '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'\n", - "region_name = 'ap-south-1'\n", - "bucket_name = 'ankur-v'\n", - "s3_output_key = 'dls.parquet'\n", - "local_file_path = 'dls_updated_final2.parquet'\n", - "s3_path = f\"s3://{bucket_name}/{s3_output_key}\"\n", - "\n", - "# ==== Redshift Config ====\n", - "redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'\n", - "redshift_db = 'dev'\n", - "redshift_user = 'awsuser'\n", - "redshift_password = 'LtRedshift-2024'\n", - "redshift_port = 5439\n", - "\n", - "# ==== SQLAlchemy Config ====\n", - "engine = create_engine(\n", - " f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'\n", - ")\n", - "Session = sessionmaker(bind=engine)\n", - "\n", - "# ==== Step 1: Upload to S3 ====\n", - "def upload_to_s3():\n", - " print(\"🔼 Uploading file to S3...\")\n", - " session = boto3.Session(\n", - " aws_access_key_id=aws_access_key_id,\n", - " aws_secret_access_key=aws_secret_access_key,\n", - " region_name=region_name\n", - " )\n", - " s3 = session.resource('s3')\n", - " s3.meta.client.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_output_key)\n", - " print(\"✅ File uploaded to S3.\")\n", - "\n", - "# ==== Step 2: Copy to Redshift (dls_today) ====\n", - "def copy_to_redshift():\n", - " print(\"⬇️ Copying data into Redshift temporary table...\")\n", - " conn = psycopg2.connect(\n", - " host=redshift_host,\n", - " port=redshift_port,\n", - " database=redshift_db,\n", - " user=redshift_user,\n", - " password=redshift_password\n", - " )\n", - " cursor = conn.cursor()\n", - " cursor.execute(\"truncate table lightson.dls_today;\")\n", - " copy_query = f\"\"\"\n", - " COPY lightson.dls_today\n", - " FROM '{s3_path}'\n", - " CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'\n", - " FORMAT AS PARQUET;\n", - " \"\"\"\n", - " cursor.execute(copy_query)\n", - " cursor.close()\n", - " conn.close()\n", - " print(\"✅ Copied data to dls_today.\")\n", - "\n", - "# ==== Step 3: Merge data (delete + insert) ====\n", - "def merge_into_dls():\n", - " print(\"🔄 Merging dls_today into dls...\")\n", - " try:\n", - " with Session() as session:\n", - " # Delete overlapping records\n", - " delete_result = session.execute(text(\"\"\"\n", - " DELETE FROM lightson.dls\n", - " WHERE _id IN (\n", - " SELECT _id FROM lightson.dls\n", - " INTERSECT\n", - " SELECT _id FROM lightson.dls_today\n", - " )\n", - " \"\"\"))\n", - " session.commit()\n", - " print(f\"🗑️ Rows deleted from dls: {delete_result.rowcount}\")\n", - "\n", - " with Session() as session:\n", - " # Insert new records\n", - " insert_result = session.execute(text(\"\"\"\n", - " INSERT INTO lightson.dls (\n", - " _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, pl_f, pl_https_exp,\n", - " pl_humidity, pl_lmf, pl_mo, pl_mqtts_exp, pl_multiple_flag, pl_ns,\n", - " pl_online, did, cid, ports_0, updatedat, createdat, documentupdatedat,\n", - " pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, pl_ol, pl_p, pl_pf,\n", - " pl_ph_ang, pl_proto, pl_publish, pl_room_humid, pl_room_temp,\n", - " pl_sens_pos, pl_single_flag, pl_t, pl_tempc, pl_time, pl_tot_run_hrs,\n", - " pl_tz, pl_meter, pl_voltage, pl_current, pl_power, pl_freq, pl_kwh,\n", - " pl_kwh_5min, pl_run_hrs_5min, pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp,\n", - " pl_ac_hs, pl_ac_vs, pl_rssi\n", - " )\n", - " SELECT\n", - " _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds,\n", - " NULL AS pl_f, NULL AS pl_https_exp, pl_humidity,\n", - " NULL AS pl_lmf, NULL AS pl_mo, NULL AS pl_mqtts_exp,\n", - " NULL AS pl_multiple_flag, pl_ns, pl_online, did, cid,\n", - " ports AS ports_0, updatedat, createdat, documentupdatedat,\n", - " pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt,\n", - " NULL AS pl_ol, NULL AS pl_p, pl_pf, pl_ph_ang,\n", - " pl_proto, NULL AS pl_publish, pl_room_humid,\n", - " pl_room_temp, pl_sens_pos, NULL AS pl_single_flag,\n", - " NULL AS pl_t, pl_tempc, pl_time, pl_tot_run_hrs,\n", - " NULL AS pl_tz, pl_meter, pl_voltage, pl_current,\n", - " pl_power, pl_freq, pl_kwh, pl_kwh_5min, pl_run_hrs_5min,\n", - " pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, pl_ac_hs,\n", - " pl_ac_vs, pl_rssi\n", - " FROM lightson.dls_today\n", - " \"\"\"))\n", - " session.commit()\n", - " print(f\"✅ Rows inserted into dls: {insert_result.rowcount}\")\n", - " except Exception as e:\n", - " print(\"❌ Merge failed:\", e)\n", - " traceback.print_exc()\n", - "\n", - "# ==== Main Runner ====\n", - "if __name__ == \"__main__\":\n", - " try:\n", - " upload_to_s3()\n", - " copy_to_redshift()\n", - " merge_into_dls()\n", - " print(\"🎉 Data pipeline complete.\")\n", - " except Exception as e:\n", - " print(\"🔥 Fatal error:\", e)\n", - " traceback.print_exc()\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "#---------------------------------------------------------meter raw ------------------------------------------------------------\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "\n", - "import pandas as pd\n", - "import boto3\n", - "import awswrangler as wr\n", - "from datetime import datetime, timedelta\n", - "from pymongo import MongoClient\n", - "from pathlib import Path\n", - "import os\n", - "import polars as pl\n", - "from pytz import timezone\n", - "from math import ceil\n", - "import time\n", - "import psycopg2\n", - "from sqlalchemy import create_engine, text\n", - "from sqlalchemy.orm import sessionmaker\n", - "import traceback\n", - "\n", - "# Constants\n", - "IST = timezone('Asia/Kolkata')\n", - "UTC = timezone('UTC')\n", - "CHUNK_SIZE = 100000\n", - "\n", - "def meter_raw_migration():\n", - " try:\n", - " print(\"Starting meter_raw_migration process...\")\n", - "\n", - " # ==== Configuration ====\n", - " # Define start_date (yesterday) and end_date (today) in UTC\n", - " start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)\n", - " end_date = datetime.utcnow()\n", - "\n", - " # AWS and Redshift configuration\n", - " aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'\n", - " aws_secret_access_key = 'LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh'\n", - " region_name = 'ap-south-1'\n", - " bucket_name = 'ankur-v'\n", - " s3_output_key = 'meterraw.parquet'\n", - " redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'\n", - " redshift_db = 'dev'\n", - " redshift_user = 'awsuser'\n", - " redshift_password = 'LtRedshift-2024'\n", - " redshift_port = 5439\n", - "\n", - " # ==== Data Processing ====\n", - " # Create output directory\n", - " root_path = Path.cwd() / \"meterRaw\" / end_date.strftime(\"%Y_%m_%d\")\n", - " root_path.mkdir(parents=True, exist_ok=True)\n", - "\n", - " # Define schema and columns\n", - " cols = [\n", - " '_id', 'm', 'DS', 'MS', 'Power(W)', 'ac_power', 'angle', 'current_kwh',\n", - " 'fan_speed', 'freq', 'mode', 'serialNo', 'temp', 'updatedAt', 'room_temp',\n", - " 'room_humid', 'ac_hs', 'ac_vs', 'kwh_5min', 'v', 'timestamp', 'oem',\n", - " 'Algo_flag', 'createdAt', 'current_phase_1', 'current_phase_2',\n", - " 'current_phase_3', 'voltage_phase_1', 'voltage_phase_2', 'voltage_phase_3',\n", - " 'power_factor_phase_1', 'power_factor_phase_2', 'power_factor_phase_3'\n", - " ]\n", - "\n", - " schema = {\n", - " \"_id\": \"string\",\n", - " \"updatedAt\": \"datetime64[ns]\",\n", - " \"createdAt\": \"datetime64[ns]\",\n", - " \"m\": \"string\",\n", - " \"current_phase_1\": \"Float64\",\n", - " \"current_phase_2\": \"Float64\",\n", - " \"current_phase_3\": \"Float64\",\n", - " \"DS\": \"boolean\",\n", - " \"MS\": \"boolean\",\n", - " \"Power(W)\": \"Float64\",\n", - " \"voltage_phase_1\": \"Float64\",\n", - " \"voltage_phase_2\": \"Float64\",\n", - " \"voltage_phase_3\": \"Float64\",\n", - " \"ac_power\": \"Int64\",\n", - " \"angle\": \"Float64\",\n", - " \"current_kwh\": \"Float64\",\n", - " \"fan_speed\": \"Int64\",\n", - " \"freq\": \"Float64\",\n", - " \"mode\": \"Int64\",\n", - " \"power_factor_phase_1\": \"Float64\",\n", - " \"power_factor_phase_2\": \"Float64\",\n", - " \"power_factor_phase_3\": \"Float64\",\n", - " \"serialNo\": \"string\",\n", - " \"temp\": \"Float64\",\n", - " \"room_temp\": \"Float64\",\n", - " \"room_humid\": \"Float64\",\n", - " \"ac_hs\": \"Int64\",\n", - " \"ac_vs\": \"Int64\",\n", - " \"kwh_5min\": \"Float64\",\n", - " \"v\": \"string\",\n", - " \"timestamp\": \"string\",\n", - " \"oem\": \"string\",\n", - " \"Algo_flag\": \"string\"\n", - " }\n", - "\n", - " # MongoDB connection\n", - " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0\")\n", - " db = client.get_database(\"lightson\")\n", - " coll = db.get_collection(\"meterRaw\")\n", - "\n", - " # Query definitions\n", - " query = {\n", - " \"single\": {\n", - " \"Current.0\": {\"$exists\": False}\n", - " },\n", - " \"three\": {\n", - " \"Current.0\": {\"$exists\": True}\n", - " }\n", - " }\n", - "\n", - " # Helper function for chunk processing\n", - " def get_chunk(rows: int, completed_count: int = 0, chunk_size=CHUNK_SIZE):\n", - " chunks = ceil((rows - completed_count) / chunk_size)\n", - " for i in range(chunks):\n", - " skip = completed_count + (i * chunk_size)\n", - " limit = min(rows - skip, chunk_size)\n", - " yield skip, limit\n", - "\n", - " # ==== Process Single Phase Data ====\n", - " print(\"Processing single phase data...\")\n", - " query_single_phase = query[\"single\"].copy()\n", - " query_single_phase.update({\n", - " \"updatedAt\": {\n", - " \"$gte\": start_date,\n", - " \"$lte\": end_date\n", - " }\n", - " })\n", - "\n", - " count = coll.count_documents(query_single_phase)\n", - " idx = 0\n", - "\n", - " for skip, limit in get_chunk(count):\n", - " df = pd.DataFrame(list(coll.find(query_single_phase, skip=skip, limit=limit)))\n", - " print(f\"Processing single phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}\")\n", - "\n", - " df[\"_id\"] = df[\"_id\"].astype(str)\n", - " df = df.rename(columns={\n", - " \"Current\": \"current_phase_1\",\n", - " \"Voltage\": \"voltage_phase_1\",\n", - " \"power_factor\": \"power_factor_phase_1\"\n", - " }).reindex(columns=cols).astype(schema)\n", - "\n", - " df.to_parquet(root_path / f\"single_phase_idx_{idx}.parquet\", use_deprecated_int96_timestamps=True)\n", - " idx += 1\n", - "\n", - " # ==== Process Three Phase Data ====\n", - " print(\"Processing three phase data...\")\n", - " query_three_phase = query[\"three\"].copy()\n", - " query_three_phase.update({\n", - " \"updatedAt\": {\n", - " \"$gte\": start_date,\n", - " \"$lte\": end_date\n", - " }\n", - " })\n", - "\n", - " count = coll.count_documents(query_three_phase)\n", - " idx = 0\n", - "\n", - " for skip, limit in get_chunk(count):\n", - " df = pd.DataFrame(list(coll.find(query_three_phase, skip=skip, limit=limit)))\n", - " print(f\"Processing three phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}\")\n", - "\n", - " # Expand array columns\n", - " for key in (\"Current\", \"Voltage\", \"power_factor\"):\n", - " array_col = df.pop(key)\n", - " df[[f\"{key.lower()}_phase_1\", f\"{key.lower()}_phase_2\", f\"{key.lower()}_phase_3\"]] = array_col.to_list()\n", - "\n", - " df[\"_id\"] = df[\"_id\"].astype(str)\n", - " df = df.reindex(columns=cols).astype(schema)\n", - " df.to_parquet(root_path / f\"three_phase_idx_{idx}.parquet\", use_deprecated_int96_timestamps=True)\n", - " idx += 1\n", - "\n", - " # ==== Combine and Finalize Data ====\n", - " print(\"Combining all data files...\")\n", - " df = pl.scan_parquet(root_path / \"*.parquet\")\n", - " final_filename = f\"meterRaw_{end_date.strftime('%Y_%m_%d')}.parquet\"\n", - " df.collect().write_parquet(\n", - " Path.cwd() / final_filename,\n", - " compression=\"snappy\",\n", - " use_pyarrow=True,\n", - " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", - " )\n", - "\n", - " # Clean up temporary files\n", - " for children in root_path.glob(\"*.parquet\"):\n", - " children.unlink(missing_ok=True)\n", - " root_path.rmdir()\n", - "\n", - " # Verify final file\n", - " df2 = pl.scan_parquet(Path.cwd() / final_filename)\n", - " date_range = df2.select([\n", - " pl.col(\"createdAt\").min().alias(\"min\"),\n", - " pl.col(\"createdAt\").max().alias(\"max\")\n", - " ]).collect()\n", - " print(f\"Data range in final file: {date_range}\")\n", - " print(\"meterraw_is_generated\")\n", - "\n", - " # ==== Upload to S3 ====\n", - " print(\"🔼 Uploading MeterRaw to S3...\")\n", - " session = boto3.Session(\n", - " aws_access_key_id=aws_access_key_id,\n", - " aws_secret_access_key=aws_secret_access_key,\n", - " region_name=region_name\n", - " )\n", - " s3 = session.resource('s3')\n", - " s3.meta.client.upload_file(\n", - " Filename=final_filename,\n", - " Bucket=bucket_name,\n", - " Key=s3_output_key\n", - " )\n", - " print(\"✅ File uploaded to S3.\")\n", - "\n", - " # ==== Redshift Operations ====\n", - " # Initialize SQLAlchemy engine\n", - " engine = create_engine(\n", - " f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'\n", - " )\n", - " Session = sessionmaker(bind=engine)\n", - "\n", - " # ==== Copy to Redshift (meter_today) ====\n", - " print(\"⬇️ Copying data into Redshift meter_today...\")\n", - " s3_path = f\"s3://{bucket_name}/{s3_output_key}\"\n", - "\n", - "\n", - "\n", - " conn = psycopg2.connect(\n", - " host=redshift_host,\n", - " port=redshift_port,\n", - " database=redshift_db,\n", - " user=redshift_user,\n", - " password=redshift_password)\n", - "\n", - "\n", - "\n", - "\n", - " cursor = conn.cursor()\n", - " cursor.execute(\"TRUNCATE TABLE lightson.meter_today;\")\n", - " cursor.execute(f\"\"\"COPY lightson.meter_today FROM '{s3_path}' CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' FORMAT AS PARQUET;\"\"\")\n", - " conn.commit()\n", - " print(\"✅ Copied data to meter_today.\")\n", - "\n", - " # ==== Merge into meterraw ====\n", - " print(\"🔄 Merging meter_today into meterraw...\")\n", - " try:\n", - " with Session() as session:\n", - " # Delete duplicates\n", - " delete_result = session.execute(text(\"\"\"\n", - " DELETE FROM lightson.meterraw\n", - " WHERE _id IN (\n", - " SELECT _id FROM lightson.meterraw\n", - " INTERSECT\n", - " SELECT _id FROM lightson.meter_today\n", - " )\n", - " \"\"\"))\n", - " session.commit()\n", - " print(f\"🗑️ Deleted rows from meterraw: {delete_result.rowcount}\")\n", - "\n", - " # Insert new records\n", - " insert_result = session.execute(text(\"\"\"\n", - " INSERT INTO lightson.meterraw (\n", - " _id, m, ds, ms, power_w_, ac_power, angle, current_kwh, fan_speed,\n", - " freq, mode, serialno, temp, updatedat, createdat, current_phase_1,\n", - " current_phase_2, current_phase_3, voltage_phase_1, voltage_phase_2,\n", - " voltage_phase_3, power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,\n", - " room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, \"timestamp\", oem, algo_flag\n", - " )\n", - " SELECT\n", - " _id, m,\n", - " CAST(ds AS BOOLEAN),\n", - " CAST(ms AS BOOLEAN),\n", - " \"power(w)\" AS power_w_,\n", - " ac_power, angle, current_kwh, fan_speed, freq, mode, serialno, temp,\n", - " updatedat, createdat, current_phase_1, current_phase_2, current_phase_3,\n", - " voltage_phase_1, voltage_phase_2, voltage_phase_3,\n", - " power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,\n", - " room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, \"timestamp\", oem, algo_flag\n", - " FROM lightson.meter_today\n", - " \"\"\"))\n", - " session.commit()\n", - " print(f\"✅ Inserted rows into meterraw: {insert_result.rowcount}\")\n", - "\n", - " except Exception as e:\n", - " print(\"❌ Merge failed:\", e)\n", - " traceback.print_exc()\n", - " raise\n", - "\n", - " print(\"🎉 MeterRaw pipeline complete.\")\n", - "\n", - " except Exception as e:\n", - " print(\"🔥 Fatal error in pipeline:\", e)\n", - " traceback.print_exc()\n", - " raise\n", - " finally:\n", - " # Clean up resources\n", - " if 'client' in locals():\n", - " client.close()\n", - " if 'engine' in locals():\n", - " engine.dispose()\n", - " print(\"Resources cleaned up.\")\n", - "\n", - "if __name__ == \"__main__\":\n", - " meter_raw_migration()\n" - ] - }, - { - "cell_type": "code", - "source": [], - "metadata": { - "id": "0qJ5OXpnBdzk" - }, - "execution_count": null, - "outputs": [] - } - ] -} \ No newline at end of file +import time +import logging +import traceback +from math import ceil +from os import getenv +from datetime import datetime, timedelta +from pathlib import Path + +import pandas as pd +import polars as pl +import pytz +from zoneinfo import ZoneInfo + +import boto3 +import awswrangler as wr +import redshift_connector +import psycopg2 + +from pymongo import MongoClient, errors +from pymongo.database import Database +from pymongo.collection import Collection + +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker + + + + + +IST = pytz.timezone('Asia/Kolkata') +now = datetime.now(IST) +print(now.strftime('%Y-%m-%d %H:%M:%S')) + + + +UTC = pytz.UTC # or: pytz.timezone('UTC') +now = datetime.now(UTC) +print(now) + + + + +#----------------------port forwarding --------------------- + + +import subprocess +import socket +import time + +def is_port_in_use(port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(('localhost', port)) == 0 + +def start_port_forward(): + try: + print("Starting port-forward...") + # You can add full path to ./kubectl if needed + subprocess.Popen( + ['./kubectl', 'port-forward', '--namespace', 'mongo', 'mongo-db-dr-0', '9001:27017'], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + print("Port-forward command issued.") + except Exception as e: + print(f"Error starting port-forward: {e}") + +def ensure_port_forward(): + if is_port_in_use(9001): + print("Port 9001 is already in use — no need to start port-forward.") + else: + print("Port 9001 is NOT in use — starting port-forward.") + start_port_forward() + time.sleep(5) # Wait a few seconds to let it establish + if is_port_in_use(9001): + print("Port-forward established successfully") + else: + print("Failed to establish port-forward ") + +if __name__ == "__main__": + ensure_port_forward() + + + + + +#-----------------------------data migration start----------------------- + + + + + +# Configure logging to print instead of writing to a file +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +def data_migration(client): + """Fetch, process, and store data from MongoDB.""" + try: + db = client["lightson"] + logging.info("Connected to MongoDB successfully.") + print(" Connection successfully established.") + + + + + # New_Energy_Saving Data + collection = db['new_energySaving'] + end_date = datetime.today() - timedelta(days=1) + # Ensure all required columns exist + required_columns = ["_id", "serialNo", "devices", "mitigated_co2","et" , "total_kwh", "deviceMapID", "weather_min_temp", "weather_max_temp", "billing_amount", + "updatedAt", "ac_fan_hrs", "ac_run_hrs", "algo_status", "clientID", "cost_reduction", "createdAt", "energy_savings_ref_kwh", + "energy_savings_us_meter", "energy_savings_inv_factor", "energy_savings_us_calc", "energy_savings_savings_percent"] + + cursor = collection.find({"createdAt": {"$lt": end_date},}, + {"_id": 1, "serialNo": 1, "devices": 1, "mitigated_co2": 1, "total_kwh": 1, "deviceMapID": 1, "weather": 1, + "billing_amount": 1, "updatedAt": 1, "ac_fan_hrs": 1, "ac_run_hrs": 1, "algo_status": 1, + "clientID": 1, "cost_reduction": 1, "createdAt": 1, "energy_savings": 1}) + # Convert cursor to a list + data = list(cursor) + if not data: + print("No data found in the specified date range.") + else: + df_nes = pd.json_normalize(data, sep='_') + df_nes = df_nes.explode("devices") + # Convert ObjectId fields to strings + + for col in ['_id', 'clientID', 'deviceMapID', 'devices', "serialNo"]: + if col in df_nes.columns: + df_nes[col] = df_nes[col].astype(str) + + # Convert numerical fields properly + + for col in ["mitigated_co2", "energy_savings_ref_kwh", "energy_savings_savings_percent"]: + if col in df_nes.columns: + df_nes[col] = pd.to_numeric(df_nes[col], errors="coerce") + + + for col in required_columns: + if col not in df_nes.columns: + df_nes[col] = None + + df_nes = df_nes[required_columns] + + df_nes.drop_duplicates(subset='_id') + df_nes["et"] = df_nes["et"].fillna("") + df_nes.rename(columns={'weather_min_temp': 'min_temp','weather_max_temp': 'max_temp','energy_savings_ref_kwh': 'ref_kwh', + 'energy_savings_us_meter': 'us_meter','energy_savings_inv_factor': 'inv_factor', + 'energy_savings_us_calc': 'us_calc','energy_savings_savings_percent': 'savings_percent'}, inplace=True) + # df_nes["et"] = df_nes["et"].fillna("0").astype(str) + df_nes.to_parquet("New_Energy_Saving.parquet", use_deprecated_int96_timestamps=True, index=False, engine='pyarrow') + df_nes = pd.read_parquet("New_Energy_Saving.parquet") + print(df_nes.head(3)) + print("New_Energy_Saving.parquet has been successfully saved.") + + # Devices Data + df_devices = pd.DataFrame(db.devices.find()) + if df_devices.empty: + logging.warning("No data found in 'devices' collection.") + else: + df_devices["_id"] = df_devices["_id"].astype(str) + df_devices["assignedTo"] = df_devices["assignedTo"].astype(str) + if "ports" in df_devices.columns: + df_devices["ports"] = df_devices["ports"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x) + df_devices = df_devices.explode("ports") + for column in df_devices.columns: + if column not in ["_id","__v","algorithm_type","assignedTo","AV1_ENABLE","AV1_PENDING_STATUS","AV2_ENABLE","AV2_PENDING_STATUS","configured", + "createdAt","deviceId","dType","IS_MAINTENANCE","IS_PREDICTIVE_MAINTENANCE","last_change","lastConfiguredAt","lastFactoryResetAt","live_algo_status", + "lp","macId","modelnumber","ns","online","OTAUpdatedAt","ports","proto_updatedAt","protocolname","status","temp","updatedAt","version",]: + df_devices.drop(columns=column, inplace=True) + df_devices.to_parquet("devices.parquet", use_deprecated_int96_timestamps = True, index = False) + df_devices = pd.read_parquet("devices.parquet") + df_devices = df_devices.drop_duplicates(subset=["_id"]) + print("Devices Data:\n", df_devices.head(1)) # Debugging Output + + # Ports Data + df_ports = pd.DataFrame(db.ports.find()) + if df_ports.empty: + logging.warning("No data found in 'ports' collection.") + else: + df_ports = df_ports.astype({"_id": "string", "device": "string", "groupId": "string"}, errors="ignore") + df_ports = df_ports.astype("string", errors="ignore") + df_ports.to_parquet("ports.parquet", use_deprecated_int96_timestamps = True, index = False) + df_ports = pd.read_parquet("ports.parquet") + + print("Ports Data:\n", df_ports.head(1)) + + # Clients Data + df_client = pd.DataFrame(db.clients.find()) + if df_client.empty: + logging.warning("No data found in 'clients' collection.") + else: + df_client["_id"] = df_client["_id"].astype(str) + for col in ["users", "feature_configuration"]: + if col in df_client.columns: + df_client.drop(col, axis=1, inplace=True) + if "devices" in df_client.columns: + df_client["devices"] = df_client["devices"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x) + df_client = df_client.explode("devices") + for column in df_client.columns: + if column not in ["_id","devices","n","e","m","addr","pin","city","state", + "country","createdAt","updatedAt","__v","ct"]: + df_client.drop(columns=column, inplace=True) + df_client.to_parquet("clients.parquet" , use_deprecated_int96_timestamps = True, index = False) + df_client = pd.read_parquet("clients.parquet") + print("Clients Data:\n", df_client.head(1)) # Debugging Output + + # Device Map Data + df_deviceMap = pd.DataFrame(client.lightson.deviceMap.find()) + if df_deviceMap.empty: + logging.warning("No data found in 'deviceMap' collection.") + else: + df_deviceMap["_id"] = df_deviceMap["_id"].astype(str) + df_deviceMap["ac_age"] = df_deviceMap["ac_age"].astype(str) + if "clientID" in df_deviceMap.columns: + df_deviceMap["clientID"] = df_deviceMap["clientID"].astype(str) + if "devices" in df_deviceMap.columns: + df_deviceMap["devices"] = df_deviceMap["devices"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x) + df_deviceMap = df_deviceMap.explode("devices") + if "facilityId" in df_deviceMap.columns: + df_deviceMap["facilityId"] = df_deviceMap["facilityId"].astype(str) + df_devicemap2 = df_deviceMap.copy(deep = True) + df_deviceMap = pd.concat([df_devicemap2.drop("toggle_threshold", axis = 1), pd.json_normalize(df_devicemap2["toggle_threshold"], sep = "_").add_prefix("toggle_threshold_")]) + for column in df_deviceMap.columns: + if column not in ["_id","__v","ac_age","ac_brand","ac_model_number","ac_ton","ac_type","atm_id","auto_params_calculation","city","client_type","clientID","cool_down", + "createdAt","current_kwh","current_threshold","devices","Efan","fan_consumption","fan_off_equal","fan_threshold","fm_name","fromdate","installation_date", + "latest_ES_Date","Meter","mode_threshold","facilityId","model","NAC","perUnitCost","phase","power_correction","ref_kwh","run_hrs","sched_lock", + "serialNo","SIM_number","site","site_category","site_status","temp_threshold","todate","total_real_energy","updatedAt","sched_lock", + "sched_executed_at","sched_executed_pwr","sched_imminent","sched_last_mismatch_at","sched_corrected_at","star_rating","toggle_threshold_on2off", + "toggle_threshold_off2on","toggle_threshold_notfan2fan","toggle_threshold_fan2notfan" + ]: + df_deviceMap.drop(columns=column, inplace=True) + df_deviceMap.to_parquet("deviceMap.parquet", use_deprecated_int96_timestamps = True, index = False) + df_deviceMap = pd.read_parquet("deviceMap.parquet") + df_deviceMap = pd.read_parquet("deviceMap.parquet").drop_duplicates(subset=['_id'], keep='first').reset_index(drop=True) + + print("Device Map Data:\n", df_deviceMap.head(1)) + + + # daily_dump + + df_dump = pd.read_csv(f"https://docs.google.com/spreadsheets/d/{'16CfL8N28-2nqUxWkbRGQVKlGViW0SoGlbsX4cUIzxKY'}/export?format=csv") + df_dump.columns = df_dump.columns.str.replace(' ', '_') + df_dump = df_dump.astype("string", errors="ignore") + df_dump["Date"] = pd.to_datetime(df_dump["Date"], format = "%d/%m/%Y", errors="coerce") + df_dump.to_parquet("daily_dump.parquet", index = False, use_deprecated_int96_timestamps=True) + df_dump = pd.read_parquet("daily_dump.parquet") + + + + + + df_deviceMap = pd.read_parquet("deviceMap.parquet") + df_clients = pd.read_parquet("clients.parquet") + df_ports = pd.read_parquet("ports.parquet") + df_nes = pd.read_parquet("New_Energy_Saving.parquet") + df_devices = pd.read_parquet("devices.parquet") + df_dump = pd.read_parquet("daily_dump.parquet") + # Correct dictionary (keys should be strings, not DataFrames) + dfs = {"devicemap": df_deviceMap,"clients": df_clients,"ports": df_ports,"New_Energy_Saving": df_nes,"devices": df_devices ,"daily_dump":df_dump} + + session = boto3.Session( + aws_access_key_id = "AKIARXCGNNYDRHFBBB6I", + aws_secret_access_key = "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", + region_name="ap-south-1") + + s3_path = "s3://daily-dump-bucket/daily_dump" + + + conn_redshift = wr.redshift.connect(secret_id="sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb", boto3_session=session) + + # Loop through DataFrames and print their sizes + dfs = {"devicemap": df_deviceMap,"clients": df_clients,"ports": df_ports,"new_energy_saving": df_nes,"devices": df_devices ,"daily_dump": df_dump} + for name, df in dfs.items(): + wr.redshift.copy(df=df, + path= s3_path, + con=conn_redshift, + schema="lightson", + table=name, + mode="overwrite", + varchar_lengths_default=65535, + boto3_session=session) + time.sleep(5) # Delay to prevent rapid successive writes + + + logging.info("Data migration completed successfully.") + return df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump + + + wr.redshift.copy(df=df_dump, + path= s3_path, + con=conn_redshift, + schema="lightson", + table=daily_dump_historical, + mode="append", + varchar_lengths_default=65535, + boto3_session=session) + + except errors.ServerSelectionTimeoutError: + logging.error(" MongoDB connection failed! Is the server running?") + return None + except Exception as e: + logging.error(f" Unexpected error: {e}") + return None + + +# Example usage +try: + client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=5000") + logging.info("MongoDB client created successfully.") + + df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump = data_migration(client) + + # Final check if data was retrieved + if df_nes is not None: + print("\n Migration completed successfully.") + else: + print("\n Migration failed. Check logs for details.") +except Exception as e: + logging.error(f" Fatal error in script: {e}") + + + + + + + + + + + + + + + + + + + + +#---------------------------------------------dls migration ------------------------------ + + +import pandas as pd +import boto3 +import awswrangler as wr +from datetime import datetime, timedelta +from pymongo import MongoClient +from pathlib import Path +import os +import polars as pl +from pytz import timezone + +# Constants +UTC = timezone('UTC') +IST = timezone('Asia/Kolkata') + +def main(): + try: + # Initialize connections and settings + client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0") + db = client['lightson'] + collection = db['dls'] + + s3_path = "s3://daily-dump-bucket/ankur-v" + session = boto3.Session( + aws_access_key_id="AKIARXCGNNYDRHFBBB6I", + aws_secret_access_key="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", + region_name="ap-south-1" + ) + + conn_redshift = wr.redshift.connect(secret_id="sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb", boto3_session=session) + + # Clean up previous file if exists + Path("dls.csv").unlink(missing_ok=True) + + # Batch size for processing + batch_size = 200000 + output_file = "dls.csv" + + # Get yesterday's date in UTC (12:00 AM IST) + start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None) + + # Define the expected columns (schema) for the CSV file + columns = [ + "_id", "__v", "m", "t", "pl_m", "pl_v", "pl_Algo_flag", "pl_pir", "pl_DS", "pl_F", + "pl_https_exp", "pl_Humidity", "pl_LMF", "pl_Mo", "pl_mqtts_exp", "pl_multiple_flag", + "pl_NS", "pl_online", "dId", "cId", "ports", "updatedAt", "createdAt", + "documentUpdatedAt", "pl_tz_set", "pl_timestamp", "pl_oem", + "pl_off_pkt", "pl_OL", "pl_P", "pl_pf", "pl_ph_ang", "pl_proto", "pl_publish", + "pl_room_humid", "pl_room_temp", "pl_sens_pos", "pl_single_flag", "pl_T", + "pl_Tempc", "pl_Time", "pl_tot_run_hrs", "pl_tz", "pl_meter", "pl_voltage", + "pl_current", "pl_power", "pl_freq", "pl_kwh", "pl_kwh_5min", + "pl_run_hrs_5min", "pl_ac_p", "pl_ac_mo", "pl_ac_fs", "pl_ac_temp", + "pl_ac_hs", "pl_ac_vs", "pl_rssi" + ] + + # Function to process and append chunks to CSV + def process_and_append_to_csv(skip, first_chunk): + cursor = collection.find( + { + "t": "p", + "createdAt": {"$gte": start_date}, + }, + {"_id": 1, "__v": 1, "m": 1, "t": 1, "pl": 1, "dId": 1, "cId": 1, "pVal": 1, + "ports": 1, "updatedAt": 1, "createdAt": 1, "documentUpdatedAt": 1} + ).skip(skip).limit(batch_size) + + data = list(cursor) + if not data: + return False + + df = pd.json_normalize(data, sep="_") + print(df["updatedAt"].min(), df["updatedAt"].max()) + + if "ports" in df.columns: + df["ports"] = df["ports"].apply(lambda x: str(x[0]) if isinstance(x, list) and x else None) + + df.loc[:, ~df.columns.isin(["updatedAt", "createdAt", "documentUpdatedAt"])] = \ + df.loc[:, ~df.columns.isin(["updatedAt", "createdAt", "documentUpdatedAt"])].astype(str) + + for col in columns: + if col not in df.columns: + df[col] = None + + df["pl_power"] = df['pl_power'].replace('nan', None) + df["pl_Humidity"] = df['pl_Humidity'].replace('nan', None) + df["pl_Tempc"] = df['pl_Tempc'].replace('nan', None) + df["pl_room_temp"] = df['pl_room_temp'].replace('nan', None) + df["pl_kwh_5min"] = df['pl_kwh_5min'].replace('nan', None) + + df = df[columns] + df.to_csv(output_file, mode='a', header=first_chunk, index=False) + return True + + # Processing the data in chunks + skip = 0 + first_chunk = True + chunk_number = 0 + + while True: + if not process_and_append_to_csv(skip, first_chunk): + break + + chunk_number += 1 + print(f"Chunk {chunk_number} successfully appended.") + skip += batch_size + first_chunk = False + + print(f"Data has been written to {output_file}. Total chunks processed: {chunk_number}") + + # Convert CSV to Parquet + df = pl.scan_csv("dls.csv", infer_schema=False) + cols = df.collect_schema().names() + print(f"Columns count: {len(cols)}, Unique columns (case-insensitive): {len(set(x.lower() for x in cols))}") + + df_final = df.with_columns( + pl.col("createdAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("createdAt"), + pl.col("updatedAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("updatedAt"), + pl.col("documentUpdatedAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("documentUpdatedAt"), + ).collect().write_parquet( + "dls_updated.parquet", + compression="snappy", + use_pyarrow=True, + pyarrow_options={"use_deprecated_int96_timestamps": True} + ) + + # Filter out null createdAt records + df2 = pl.scan_parquet("dls_updated.parquet") + df2.filter(pl.col("createdAt").is_not_null()).collect().write_parquet( + "dls_updated_final2.parquet", + compression="snappy", + use_pyarrow=True, + pyarrow_options={"use_deprecated_int96_timestamps": True} + ) + + # Verify data + dls_df = pl.scan_parquet("dls_updated_final2.parquet") + min_date, max_date = dls_df.select(pl.min("updatedAt")).collect(), dls_df.select(pl.max("updatedAt")).collect() + print(f"Data range - Min: {min_date}, Max: {max_date}") + print("dls is ready") + + # Final cleanup and upload preparation + df = pd.read_parquet("dls_updated_final2.parquet") + df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']] = \ + df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']].replace('nan', None) + df = df.dropna(axis=1, how='all') + df.to_parquet("dls_updated_final2.parquet", use_deprecated_int96_timestamps=True) + + print("Data migration completed successfully!") + + except Exception as e: + print(f"Error during data migration: {str(e)}") + raise + finally: + # Clean up resources + if 'client' in locals(): + client.close() + if 'conn_redshift' in locals(): + conn_redshift.close() + + +if __name__ == "__main__": + main() + + + +# ==== AWS + S3 Config ==== +aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I' +aws_secret_access_key = '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' +region_name = 'ap-south-1' +bucket_name = 'ankur-v' +s3_output_key = 'dls.parquet' +local_file_path = 'dls_updated_final2.parquet' +s3_path = f"s3://{bucket_name}/{s3_output_key}" + +# ==== Redshift Config ==== +redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com' +redshift_db = 'dev' +redshift_user = 'awsuser' +redshift_password = 'LtRedshift-2024' +redshift_port = 5439 + +# ==== SQLAlchemy Config ==== +engine = create_engine( + f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}' +) +Session = sessionmaker(bind=engine) + +# ==== Step 1: Upload to S3 ==== +def upload_to_s3(): + print("🔼 Uploading file to S3...") + session = boto3.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name + ) + s3 = session.resource('s3') + s3.meta.client.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_output_key) + print("✅ File uploaded to S3.") + +# ==== Step 2: Copy to Redshift (dls_today) ==== +def copy_to_redshift(): + print("⬇️ Copying data into Redshift temporary table...") + conn = psycopg2.connect( + host=redshift_host, + port=redshift_port, + database=redshift_db, + user=redshift_user, + password=redshift_password + ) + cursor = conn.cursor() + cursor.execute("truncate table lightson.dls_today;") + copy_query = f""" + COPY lightson.dls_today + FROM '{s3_path}' + CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' + FORMAT AS PARQUET; + """ + cursor.execute(copy_query) + cursor.close() + conn.close() + print("✅ Copied data to dls_today.") + +# ==== Step 3: Merge data (delete + insert) ==== +def merge_into_dls(): + print("🔄 Merging dls_today into dls...") + try: + with Session() as session: + # Delete overlapping records + delete_result = session.execute(text(""" + DELETE FROM lightson.dls + WHERE _id IN ( + SELECT _id FROM lightson.dls + INTERSECT + SELECT _id FROM lightson.dls_today + ) + """)) + session.commit() + print(f"🗑️ Rows deleted from dls: {delete_result.rowcount}") + + with Session() as session: + # Insert new records + insert_result = session.execute(text(""" + INSERT INTO lightson.dls ( + _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, pl_f, pl_https_exp, + pl_humidity, pl_lmf, pl_mo, pl_mqtts_exp, pl_multiple_flag, pl_ns, + pl_online, did, cid, ports_0, updatedat, createdat, documentupdatedat, + pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, pl_ol, pl_p, pl_pf, + pl_ph_ang, pl_proto, pl_publish, pl_room_humid, pl_room_temp, + pl_sens_pos, pl_single_flag, pl_t, pl_tempc, pl_time, pl_tot_run_hrs, + pl_tz, pl_meter, pl_voltage, pl_current, pl_power, pl_freq, pl_kwh, + pl_kwh_5min, pl_run_hrs_5min, pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, + pl_ac_hs, pl_ac_vs, pl_rssi + ) + SELECT + _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, + NULL AS pl_f, NULL AS pl_https_exp, pl_humidity, + NULL AS pl_lmf, NULL AS pl_mo, NULL AS pl_mqtts_exp, + NULL AS pl_multiple_flag, pl_ns, pl_online, did, cid, + ports AS ports_0, updatedat, createdat, documentupdatedat, + pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, + NULL AS pl_ol, NULL AS pl_p, pl_pf, pl_ph_ang, + pl_proto, NULL AS pl_publish, pl_room_humid, + pl_room_temp, pl_sens_pos, NULL AS pl_single_flag, + NULL AS pl_t, pl_tempc, pl_time, pl_tot_run_hrs, + NULL AS pl_tz, pl_meter, pl_voltage, pl_current, + pl_power, pl_freq, pl_kwh, pl_kwh_5min, pl_run_hrs_5min, + pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, pl_ac_hs, + pl_ac_vs, pl_rssi + FROM lightson.dls_today + """)) + session.commit() + print(f"✅ Rows inserted into dls: {insert_result.rowcount}") + except Exception as e: + print("❌ Merge failed:", e) + traceback.print_exc() + +# ==== Main Runner ==== +if __name__ == "__main__": + try: + upload_to_s3() + copy_to_redshift() + merge_into_dls() + print("🎉 Data pipeline complete.") + except Exception as e: + print("🔥 Fatal error:", e) + traceback.print_exc() + + + + + + + + + + + + + + + + + + + + +#---------------------------------------------------------meter raw ------------------------------------------------------------ + + + + + + + + + + + + + + + +import pandas as pd +import boto3 +import awswrangler as wr +from datetime import datetime, timedelta +from pymongo import MongoClient +from pathlib import Path +import os +import polars as pl +from pytz import timezone +from math import ceil +import time +import psycopg2 +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker +import traceback + +# Constants +IST = timezone('Asia/Kolkata') +UTC = timezone('UTC') +CHUNK_SIZE = 100000 + +def meter_raw_migration(): + try: + print("Starting meter_raw_migration process...") + + # ==== Configuration ==== + # Define start_date (yesterday) and end_date (today) in UTC + start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None) + end_date = datetime.utcnow() + + # AWS and Redshift configuration + aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I' + aws_secret_access_key = 'LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh' + region_name = 'ap-south-1' + bucket_name = 'ankur-v' + s3_output_key = 'meterraw.parquet' + redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com' + redshift_db = 'dev' + redshift_user = 'awsuser' + redshift_password = 'LtRedshift-2024' + redshift_port = 5439 + + # ==== Data Processing ==== + # Create output directory + root_path = Path.cwd() / "meterRaw" / end_date.strftime("%Y_%m_%d") + root_path.mkdir(parents=True, exist_ok=True) + + # Define schema and columns + cols = [ + '_id', 'm', 'DS', 'MS', 'Power(W)', 'ac_power', 'angle', 'current_kwh', + 'fan_speed', 'freq', 'mode', 'serialNo', 'temp', 'updatedAt', 'room_temp', + 'room_humid', 'ac_hs', 'ac_vs', 'kwh_5min', 'v', 'timestamp', 'oem', + 'Algo_flag', 'createdAt', 'current_phase_1', 'current_phase_2', + 'current_phase_3', 'voltage_phase_1', 'voltage_phase_2', 'voltage_phase_3', + 'power_factor_phase_1', 'power_factor_phase_2', 'power_factor_phase_3' + ] + + schema = { + "_id": "string", + "updatedAt": "datetime64[ns]", + "createdAt": "datetime64[ns]", + "m": "string", + "current_phase_1": "Float64", + "current_phase_2": "Float64", + "current_phase_3": "Float64", + "DS": "boolean", + "MS": "boolean", + "Power(W)": "Float64", + "voltage_phase_1": "Float64", + "voltage_phase_2": "Float64", + "voltage_phase_3": "Float64", + "ac_power": "Int64", + "angle": "Float64", + "current_kwh": "Float64", + "fan_speed": "Int64", + "freq": "Float64", + "mode": "Int64", + "power_factor_phase_1": "Float64", + "power_factor_phase_2": "Float64", + "power_factor_phase_3": "Float64", + "serialNo": "string", + "temp": "Float64", + "room_temp": "Float64", + "room_humid": "Float64", + "ac_hs": "Int64", + "ac_vs": "Int64", + "kwh_5min": "Float64", + "v": "string", + "timestamp": "string", + "oem": "string", + "Algo_flag": "string" + } + + # MongoDB connection + client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0") + db = client.get_database("lightson") + coll = db.get_collection("meterRaw") + + # Query definitions + query = { + "single": { + "Current.0": {"$exists": False} + }, + "three": { + "Current.0": {"$exists": True} + } + } + + # Helper function for chunk processing + def get_chunk(rows: int, completed_count: int = 0, chunk_size=CHUNK_SIZE): + chunks = ceil((rows - completed_count) / chunk_size) + for i in range(chunks): + skip = completed_count + (i * chunk_size) + limit = min(rows - skip, chunk_size) + yield skip, limit + + # ==== Process Single Phase Data ==== + print("Processing single phase data...") + query_single_phase = query["single"].copy() + query_single_phase.update({ + "updatedAt": { + "$gte": start_date, + "$lte": end_date + } + }) + + count = coll.count_documents(query_single_phase) + idx = 0 + + for skip, limit in get_chunk(count): + df = pd.DataFrame(list(coll.find(query_single_phase, skip=skip, limit=limit))) + print(f"Processing single phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}") + + df["_id"] = df["_id"].astype(str) + df = df.rename(columns={ + "Current": "current_phase_1", + "Voltage": "voltage_phase_1", + "power_factor": "power_factor_phase_1" + }).reindex(columns=cols).astype(schema) + + df.to_parquet(root_path / f"single_phase_idx_{idx}.parquet", use_deprecated_int96_timestamps=True) + idx += 1 + + # ==== Process Three Phase Data ==== + print("Processing three phase data...") + query_three_phase = query["three"].copy() + query_three_phase.update({ + "updatedAt": { + "$gte": start_date, + "$lte": end_date + } + }) + + count = coll.count_documents(query_three_phase) + idx = 0 + + for skip, limit in get_chunk(count): + df = pd.DataFrame(list(coll.find(query_three_phase, skip=skip, limit=limit))) + print(f"Processing three phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}") + + # Expand array columns + for key in ("Current", "Voltage", "power_factor"): + array_col = df.pop(key) + df[[f"{key.lower()}_phase_1", f"{key.lower()}_phase_2", f"{key.lower()}_phase_3"]] = array_col.to_list() + + df["_id"] = df["_id"].astype(str) + df = df.reindex(columns=cols).astype(schema) + df.to_parquet(root_path / f"three_phase_idx_{idx}.parquet", use_deprecated_int96_timestamps=True) + idx += 1 + + # ==== Combine and Finalize Data ==== + print("Combining all data files...") + df = pl.scan_parquet(root_path / "*.parquet") + final_filename = f"meterRaw_{end_date.strftime('%Y_%m_%d')}.parquet" + df.collect().write_parquet( + Path.cwd() / final_filename, + compression="snappy", + use_pyarrow=True, + pyarrow_options={"use_deprecated_int96_timestamps": True} + ) + + # Clean up temporary files + for children in root_path.glob("*.parquet"): + children.unlink(missing_ok=True) + root_path.rmdir() + + # Verify final file + df2 = pl.scan_parquet(Path.cwd() / final_filename) + date_range = df2.select([ + pl.col("createdAt").min().alias("min"), + pl.col("createdAt").max().alias("max") + ]).collect() + print(f"Data range in final file: {date_range}") + print("meterraw_is_generated") + + # ==== Upload to S3 ==== + print("🔼 Uploading MeterRaw to S3...") + session = boto3.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name + ) + s3 = session.resource('s3') + s3.meta.client.upload_file( + Filename=final_filename, + Bucket=bucket_name, + Key=s3_output_key + ) + print("✅ File uploaded to S3.") + + # ==== Redshift Operations ==== + # Initialize SQLAlchemy engine + engine = create_engine( + f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}' + ) + Session = sessionmaker(bind=engine) + + # ==== Copy to Redshift (meter_today) ==== + print("⬇️ Copying data into Redshift meter_today...") + s3_path = f"s3://{bucket_name}/{s3_output_key}" + + + + conn = psycopg2.connect( + host=redshift_host, + port=redshift_port, + database=redshift_db, + user=redshift_user, + password=redshift_password) + + + + + cursor = conn.cursor() + cursor.execute("TRUNCATE TABLE lightson.meter_today;") + cursor.execute(f"""COPY lightson.meter_today FROM '{s3_path}' CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' FORMAT AS PARQUET;""") + conn.commit() + print("✅ Copied data to meter_today.") + + # ==== Merge into meterraw ==== + print("🔄 Merging meter_today into meterraw...") + try: + with Session() as session: + # Delete duplicates + delete_result = session.execute(text(""" + DELETE FROM lightson.meterraw + WHERE _id IN ( + SELECT _id FROM lightson.meterraw + INTERSECT + SELECT _id FROM lightson.meter_today + ) + """)) + session.commit() + print(f"🗑️ Deleted rows from meterraw: {delete_result.rowcount}") + + # Insert new records + insert_result = session.execute(text(""" + INSERT INTO lightson.meterraw ( + _id, m, ds, ms, power_w_, ac_power, angle, current_kwh, fan_speed, + freq, mode, serialno, temp, updatedat, createdat, current_phase_1, + current_phase_2, current_phase_3, voltage_phase_1, voltage_phase_2, + voltage_phase_3, power_factor_phase_1, power_factor_phase_2, power_factor_phase_3, + room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, "timestamp", oem, algo_flag + ) + SELECT + _id, m, + CAST(ds AS BOOLEAN), + CAST(ms AS BOOLEAN), + "power(w)" AS power_w_, + ac_power, angle, current_kwh, fan_speed, freq, mode, serialno, temp, + updatedat, createdat, current_phase_1, current_phase_2, current_phase_3, + voltage_phase_1, voltage_phase_2, voltage_phase_3, + power_factor_phase_1, power_factor_phase_2, power_factor_phase_3, + room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, "timestamp", oem, algo_flag + FROM lightson.meter_today + """)) + session.commit() + print(f"✅ Inserted rows into meterraw: {insert_result.rowcount}") + + except Exception as e: + print("❌ Merge failed:", e) + traceback.print_exc() + raise + + print("🎉 MeterRaw pipeline complete.") + + except Exception as e: + print("🔥 Fatal error in pipeline:", e) + traceback.print_exc() + raise + finally: + # Clean up resources + if 'client' in locals(): + client.close() + if 'engine' in locals(): + engine.dispose() + print("Resources cleaned up.") + +if __name__ == "__main__": + meter_raw_migration()