{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "private_outputs": true, "provenance": [] }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "7kZDwuM2BdFf" }, "outputs": [], "source": [ "import time\n", "import logging\n", "import traceback\n", "from math import ceil\n", "from os import getenv\n", "from datetime import datetime, timedelta\n", "from pathlib import Path\n", "\n", "import pandas as pd\n", "import polars as pl\n", "import pytz\n", "from zoneinfo import ZoneInfo\n", "\n", "import boto3\n", "import awswrangler as wr\n", "import redshift_connector\n", "import psycopg2\n", "\n", "from pymongo import MongoClient, errors\n", "from pymongo.database import Database\n", "from pymongo.collection import Collection\n", "\n", "from sqlalchemy import create_engine, text\n", "from sqlalchemy.orm import sessionmaker\n", "\n", "\n", "\n", "\n", "\n", "IST = pytz.timezone('Asia/Kolkata')\n", "now = datetime.now(IST)\n", "print(now.strftime('%Y-%m-%d %H:%M:%S'))\n", "\n", "\n", "\n", "UTC = pytz.UTC # or: pytz.timezone('UTC')\n", "now = datetime.now(UTC)\n", "print(now)\n", "\n", "\n", "\n", "\n", "#----------------------port forwarding ---------------------\n", "\n", "\n", "import subprocess\n", "import socket\n", "import time\n", "\n", "def is_port_in_use(port):\n", " with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:\n", " return s.connect_ex(('localhost', port)) == 0\n", "\n", "def start_port_forward():\n", " try:\n", " print(\"Starting port-forward...\")\n", " # You can add full path to ./kubectl if needed\n", " subprocess.Popen(\n", " ['./kubectl', 'port-forward', '--namespace', 'mongo', 'mongo-db-dr-0', '9001:27017'],\n", " stdout=subprocess.DEVNULL,\n", " stderr=subprocess.DEVNULL\n", " )\n", " print(\"Port-forward command issued.\")\n", " except Exception as e:\n", " print(f\"Error starting port-forward: {e}\")\n", "\n", "def ensure_port_forward():\n", " if is_port_in_use(9001):\n", " print(\"Port 9001 is already in use — no need to start port-forward.\")\n", " else:\n", " print(\"Port 9001 is NOT in use — starting port-forward.\")\n", " start_port_forward()\n", " time.sleep(5) # Wait a few seconds to let it establish\n", " if is_port_in_use(9001):\n", " print(\"Port-forward established successfully\")\n", " else:\n", " print(\"Failed to establish port-forward \")\n", "\n", "if __name__ == \"__main__\":\n", " ensure_port_forward()\n", "\n", "\n", "\n", "\n", "\n", "#-----------------------------data migration start-----------------------\n", "\n", "\n", "\n", "\n", "\n", "# Configure logging to print instead of writing to a file\n", "logging.basicConfig(level=logging.INFO, format=\"%(asctime)s - %(levelname)s - %(message)s\")\n", "def data_migration(client):\n", " \"\"\"Fetch, process, and store data from MongoDB.\"\"\"\n", " try:\n", " db = client[\"lightson\"]\n", " logging.info(\"Connected to MongoDB successfully.\")\n", " print(\" Connection successfully established.\")\n", "\n", "\n", "\n", "\n", " # New_Energy_Saving Data\n", " collection = db['new_energySaving']\n", " end_date = datetime.today() - timedelta(days=1)\n", " # Ensure all required columns exist\n", " required_columns = [\"_id\", \"serialNo\", \"devices\", \"mitigated_co2\",\"et\" , \"total_kwh\", \"deviceMapID\", \"weather_min_temp\", \"weather_max_temp\", \"billing_amount\",\n", " \"updatedAt\", \"ac_fan_hrs\", \"ac_run_hrs\", \"algo_status\", \"clientID\", \"cost_reduction\", \"createdAt\", \"energy_savings_ref_kwh\",\n", " \"energy_savings_us_meter\", \"energy_savings_inv_factor\", \"energy_savings_us_calc\", \"energy_savings_savings_percent\"]\n", "\n", " cursor = collection.find({\"createdAt\": {\"$lt\": end_date},},\n", " {\"_id\": 1, \"serialNo\": 1, \"devices\": 1, \"mitigated_co2\": 1, \"total_kwh\": 1, \"deviceMapID\": 1, \"weather\": 1,\n", " \"billing_amount\": 1, \"updatedAt\": 1, \"ac_fan_hrs\": 1, \"ac_run_hrs\": 1, \"algo_status\": 1,\n", " \"clientID\": 1, \"cost_reduction\": 1, \"createdAt\": 1, \"energy_savings\": 1})\n", " # Convert cursor to a list\n", " data = list(cursor)\n", " if not data:\n", " print(\"No data found in the specified date range.\")\n", " else:\n", " df_nes = pd.json_normalize(data, sep='_')\n", " df_nes = df_nes.explode(\"devices\")\n", " # Convert ObjectId fields to strings\n", "\n", " for col in ['_id', 'clientID', 'deviceMapID', 'devices', \"serialNo\"]:\n", " if col in df_nes.columns:\n", " df_nes[col] = df_nes[col].astype(str)\n", "\n", " # Convert numerical fields properly\n", "\n", " for col in [\"mitigated_co2\", \"energy_savings_ref_kwh\", \"energy_savings_savings_percent\"]:\n", " if col in df_nes.columns:\n", " df_nes[col] = pd.to_numeric(df_nes[col], errors=\"coerce\")\n", "\n", "\n", " for col in required_columns:\n", " if col not in df_nes.columns:\n", " df_nes[col] = None\n", "\n", " df_nes = df_nes[required_columns]\n", "\n", " df_nes.drop_duplicates(subset='_id')\n", " df_nes[\"et\"] = df_nes[\"et\"].fillna(\"\")\n", " df_nes.rename(columns={'weather_min_temp': 'min_temp','weather_max_temp': 'max_temp','energy_savings_ref_kwh': 'ref_kwh',\n", " 'energy_savings_us_meter': 'us_meter','energy_savings_inv_factor': 'inv_factor',\n", " 'energy_savings_us_calc': 'us_calc','energy_savings_savings_percent': 'savings_percent'}, inplace=True)\n", " # df_nes[\"et\"] = df_nes[\"et\"].fillna(\"0\").astype(str)\n", " df_nes.to_parquet(\"New_Energy_Saving.parquet\", use_deprecated_int96_timestamps=True, index=False, engine='pyarrow')\n", " df_nes = pd.read_parquet(\"New_Energy_Saving.parquet\")\n", " print(df_nes.head(3))\n", " print(\"New_Energy_Saving.parquet has been successfully saved.\")\n", "\n", " # Devices Data\n", " df_devices = pd.DataFrame(db.devices.find())\n", " if df_devices.empty:\n", " logging.warning(\"No data found in 'devices' collection.\")\n", " else:\n", " df_devices[\"_id\"] = df_devices[\"_id\"].astype(str)\n", " df_devices[\"assignedTo\"] = df_devices[\"assignedTo\"].astype(str)\n", " if \"ports\" in df_devices.columns:\n", " df_devices[\"ports\"] = df_devices[\"ports\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", " df_devices = df_devices.explode(\"ports\")\n", " for column in df_devices.columns:\n", " if column not in [\"_id\",\"__v\",\"algorithm_type\",\"assignedTo\",\"AV1_ENABLE\",\"AV1_PENDING_STATUS\",\"AV2_ENABLE\",\"AV2_PENDING_STATUS\",\"configured\",\n", " \"createdAt\",\"deviceId\",\"dType\",\"IS_MAINTENANCE\",\"IS_PREDICTIVE_MAINTENANCE\",\"last_change\",\"lastConfiguredAt\",\"lastFactoryResetAt\",\"live_algo_status\",\n", " \"lp\",\"macId\",\"modelnumber\",\"ns\",\"online\",\"OTAUpdatedAt\",\"ports\",\"proto_updatedAt\",\"protocolname\",\"status\",\"temp\",\"updatedAt\",\"version\",]:\n", " df_devices.drop(columns=column, inplace=True)\n", " df_devices.to_parquet(\"devices.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", " df_devices = pd.read_parquet(\"devices.parquet\")\n", " df_devices = df_devices.drop_duplicates(subset=[\"_id\"])\n", " print(\"Devices Data:\\n\", df_devices.head(1)) # Debugging Output\n", "\n", " # Ports Data\n", " df_ports = pd.DataFrame(db.ports.find())\n", " if df_ports.empty:\n", " logging.warning(\"No data found in 'ports' collection.\")\n", " else:\n", " df_ports = df_ports.astype({\"_id\": \"string\", \"device\": \"string\", \"groupId\": \"string\"}, errors=\"ignore\")\n", " df_ports = df_ports.astype(\"string\", errors=\"ignore\")\n", " df_ports.to_parquet(\"ports.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", " df_ports = pd.read_parquet(\"ports.parquet\")\n", "\n", " print(\"Ports Data:\\n\", df_ports.head(1))\n", "\n", " # Clients Data\n", " df_client = pd.DataFrame(db.clients.find())\n", " if df_client.empty:\n", " logging.warning(\"No data found in 'clients' collection.\")\n", " else:\n", " df_client[\"_id\"] = df_client[\"_id\"].astype(str)\n", " for col in [\"users\", \"feature_configuration\"]:\n", " if col in df_client.columns:\n", " df_client.drop(col, axis=1, inplace=True)\n", " if \"devices\" in df_client.columns:\n", " df_client[\"devices\"] = df_client[\"devices\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", " df_client = df_client.explode(\"devices\")\n", " for column in df_client.columns:\n", " if column not in [\"_id\",\"devices\",\"n\",\"e\",\"m\",\"addr\",\"pin\",\"city\",\"state\",\n", " \"country\",\"createdAt\",\"updatedAt\",\"__v\",\"ct\"]:\n", " df_client.drop(columns=column, inplace=True)\n", " df_client.to_parquet(\"clients.parquet\" , use_deprecated_int96_timestamps = True, index = False)\n", " df_client = pd.read_parquet(\"clients.parquet\")\n", " print(\"Clients Data:\\n\", df_client.head(1)) # Debugging Output\n", "\n", " # Device Map Data\n", " df_deviceMap = pd.DataFrame(client.lightson.deviceMap.find())\n", " if df_deviceMap.empty:\n", " logging.warning(\"No data found in 'deviceMap' collection.\")\n", " else:\n", " df_deviceMap[\"_id\"] = df_deviceMap[\"_id\"].astype(str)\n", " df_deviceMap[\"ac_age\"] = df_deviceMap[\"ac_age\"].astype(str)\n", " if \"clientID\" in df_deviceMap.columns:\n", " df_deviceMap[\"clientID\"] = df_deviceMap[\"clientID\"].astype(str)\n", " if \"devices\" in df_deviceMap.columns:\n", " df_deviceMap[\"devices\"] = df_deviceMap[\"devices\"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)\n", " df_deviceMap = df_deviceMap.explode(\"devices\")\n", " if \"facilityId\" in df_deviceMap.columns:\n", " df_deviceMap[\"facilityId\"] = df_deviceMap[\"facilityId\"].astype(str)\n", " df_devicemap2 = df_deviceMap.copy(deep = True)\n", " df_deviceMap = pd.concat([df_devicemap2.drop(\"toggle_threshold\", axis = 1), pd.json_normalize(df_devicemap2[\"toggle_threshold\"], sep = \"_\").add_prefix(\"toggle_threshold_\")])\n", " for column in df_deviceMap.columns:\n", " if column not in [\"_id\",\"__v\",\"ac_age\",\"ac_brand\",\"ac_model_number\",\"ac_ton\",\"ac_type\",\"atm_id\",\"auto_params_calculation\",\"city\",\"client_type\",\"clientID\",\"cool_down\",\n", " \"createdAt\",\"current_kwh\",\"current_threshold\",\"devices\",\"Efan\",\"fan_consumption\",\"fan_off_equal\",\"fan_threshold\",\"fm_name\",\"fromdate\",\"installation_date\",\n", " \"latest_ES_Date\",\"Meter\",\"mode_threshold\",\"facilityId\",\"model\",\"NAC\",\"perUnitCost\",\"phase\",\"power_correction\",\"ref_kwh\",\"run_hrs\",\"sched_lock\",\n", " \"serialNo\",\"SIM_number\",\"site\",\"site_category\",\"site_status\",\"temp_threshold\",\"todate\",\"total_real_energy\",\"updatedAt\",\"sched_lock\",\n", " \"sched_executed_at\",\"sched_executed_pwr\",\"sched_imminent\",\"sched_last_mismatch_at\",\"sched_corrected_at\",\"star_rating\",\"toggle_threshold_on2off\",\n", " \"toggle_threshold_off2on\",\"toggle_threshold_notfan2fan\",\"toggle_threshold_fan2notfan\"\n", " ]:\n", " df_deviceMap.drop(columns=column, inplace=True)\n", " df_deviceMap.to_parquet(\"deviceMap.parquet\", use_deprecated_int96_timestamps = True, index = False)\n", " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\")\n", " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\").drop_duplicates(subset=['_id'], keep='first').reset_index(drop=True)\n", "\n", " print(\"Device Map Data:\\n\", df_deviceMap.head(1))\n", "\n", "\n", " # daily_dump\n", "\n", " df_dump = pd.read_csv(f\"https://docs.google.com/spreadsheets/d/{'16CfL8N28-2nqUxWkbRGQVKlGViW0SoGlbsX4cUIzxKY'}/export?format=csv\")\n", " df_dump.columns = df_dump.columns.str.replace(' ', '_')\n", " df_dump = df_dump.astype(\"string\", errors=\"ignore\")\n", " df_dump[\"Date\"] = pd.to_datetime(df_dump[\"Date\"], format = \"%d/%m/%Y\", errors=\"coerce\")\n", " df_dump.to_parquet(\"daily_dump.parquet\", index = False, use_deprecated_int96_timestamps=True)\n", " df_dump = pd.read_parquet(\"daily_dump.parquet\")\n", "\n", "\n", "\n", "\n", "\n", " df_deviceMap = pd.read_parquet(\"deviceMap.parquet\")\n", " df_clients = pd.read_parquet(\"clients.parquet\")\n", " df_ports = pd.read_parquet(\"ports.parquet\")\n", " df_nes = pd.read_parquet(\"New_Energy_Saving.parquet\")\n", " df_devices = pd.read_parquet(\"devices.parquet\")\n", " df_dump = pd.read_parquet(\"daily_dump.parquet\")\n", " # Correct dictionary (keys should be strings, not DataFrames)\n", " dfs = {\"devicemap\": df_deviceMap,\"clients\": df_clients,\"ports\": df_ports,\"New_Energy_Saving\": df_nes,\"devices\": df_devices ,\"daily_dump\":df_dump}\n", "\n", " session = boto3.Session(\n", " aws_access_key_id = \"AKIARXCGNNYDRHFBBB6I\",\n", " aws_secret_access_key = \"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\",\n", " region_name=\"ap-south-1\")\n", "\n", " s3_path = \"s3://daily-dump-bucket/daily_dump\"\n", "\n", "\n", " conn_redshift = wr.redshift.connect(secret_id=\"sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb\", boto3_session=session)\n", "\n", " # Loop through DataFrames and print their sizes\n", " dfs = {\"devicemap\": df_deviceMap,\"clients\": df_clients,\"ports\": df_ports,\"new_energy_saving\": df_nes,\"devices\": df_devices ,\"daily_dump\": df_dump}\n", " for name, df in dfs.items():\n", " wr.redshift.copy(df=df,\n", " path= s3_path,\n", " con=conn_redshift,\n", " schema=\"lightson\",\n", " table=name,\n", " mode=\"overwrite\",\n", " varchar_lengths_default=65535,\n", " boto3_session=session)\n", " time.sleep(5) # Delay to prevent rapid successive writes\n", "\n", "\n", " logging.info(\"Data migration completed successfully.\")\n", " return df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump\n", "\n", "\n", " wr.redshift.copy(df=df_dump,\n", " path= s3_path,\n", " con=conn_redshift,\n", " schema=\"lightson\",\n", " table=daily_dump_historical,\n", " mode=\"append\",\n", " varchar_lengths_default=65535,\n", " boto3_session=session)\n", "\n", " except errors.ServerSelectionTimeoutError:\n", " logging.error(\" MongoDB connection failed! Is the server running?\")\n", " return None\n", " except Exception as e:\n", " logging.error(f\" Unexpected error: {e}\")\n", " return None\n", "\n", "\n", "# Example usage\n", "try:\n", " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=5000\")\n", " logging.info(\"MongoDB client created successfully.\")\n", "\n", " df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump = data_migration(client)\n", "\n", " # Final check if data was retrieved\n", " if df_nes is not None:\n", " print(\"\\n Migration completed successfully.\")\n", " else:\n", " print(\"\\n Migration failed. Check logs for details.\")\n", "except Exception as e:\n", " logging.error(f\" Fatal error in script: {e}\")\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "#---------------------------------------------dls migration ------------------------------\n", "\n", "\n", "import pandas as pd\n", "import boto3\n", "import awswrangler as wr\n", "from datetime import datetime, timedelta\n", "from pymongo import MongoClient\n", "from pathlib import Path\n", "import os\n", "import polars as pl\n", "from pytz import timezone\n", "\n", "# Constants\n", "UTC = timezone('UTC')\n", "IST = timezone('Asia/Kolkata')\n", "\n", "def main():\n", " try:\n", " # Initialize connections and settings\n", " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0\")\n", " db = client['lightson']\n", " collection = db['dls']\n", "\n", " s3_path = \"s3://daily-dump-bucket/ankur-v\"\n", " session = boto3.Session(\n", " aws_access_key_id=\"AKIARXCGNNYDRHFBBB6I\",\n", " aws_secret_access_key=\"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\",\n", " region_name=\"ap-south-1\"\n", " )\n", "\n", " conn_redshift = wr.redshift.connect(secret_id=\"sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb\", boto3_session=session)\n", "\n", " # Clean up previous file if exists\n", " Path(\"dls.csv\").unlink(missing_ok=True)\n", "\n", " # Batch size for processing\n", " batch_size = 200000\n", " output_file = \"dls.csv\"\n", "\n", " # Get yesterday's date in UTC (12:00 AM IST)\n", " start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)\n", "\n", " # Define the expected columns (schema) for the CSV file\n", " columns = [\n", " \"_id\", \"__v\", \"m\", \"t\", \"pl_m\", \"pl_v\", \"pl_Algo_flag\", \"pl_pir\", \"pl_DS\", \"pl_F\",\n", " \"pl_https_exp\", \"pl_Humidity\", \"pl_LMF\", \"pl_Mo\", \"pl_mqtts_exp\", \"pl_multiple_flag\",\n", " \"pl_NS\", \"pl_online\", \"dId\", \"cId\", \"ports\", \"updatedAt\", \"createdAt\",\n", " \"documentUpdatedAt\", \"pl_tz_set\", \"pl_timestamp\", \"pl_oem\",\n", " \"pl_off_pkt\", \"pl_OL\", \"pl_P\", \"pl_pf\", \"pl_ph_ang\", \"pl_proto\", \"pl_publish\",\n", " \"pl_room_humid\", \"pl_room_temp\", \"pl_sens_pos\", \"pl_single_flag\", \"pl_T\",\n", " \"pl_Tempc\", \"pl_Time\", \"pl_tot_run_hrs\", \"pl_tz\", \"pl_meter\", \"pl_voltage\",\n", " \"pl_current\", \"pl_power\", \"pl_freq\", \"pl_kwh\", \"pl_kwh_5min\",\n", " \"pl_run_hrs_5min\", \"pl_ac_p\", \"pl_ac_mo\", \"pl_ac_fs\", \"pl_ac_temp\",\n", " \"pl_ac_hs\", \"pl_ac_vs\", \"pl_rssi\"\n", " ]\n", "\n", " # Function to process and append chunks to CSV\n", " def process_and_append_to_csv(skip, first_chunk):\n", " cursor = collection.find(\n", " {\n", " \"t\": \"p\",\n", " \"createdAt\": {\"$gte\": start_date},\n", " },\n", " {\"_id\": 1, \"__v\": 1, \"m\": 1, \"t\": 1, \"pl\": 1, \"dId\": 1, \"cId\": 1, \"pVal\": 1,\n", " \"ports\": 1, \"updatedAt\": 1, \"createdAt\": 1, \"documentUpdatedAt\": 1}\n", " ).skip(skip).limit(batch_size)\n", "\n", " data = list(cursor)\n", " if not data:\n", " return False\n", "\n", " df = pd.json_normalize(data, sep=\"_\")\n", " print(df[\"updatedAt\"].min(), df[\"updatedAt\"].max())\n", "\n", " if \"ports\" in df.columns:\n", " df[\"ports\"] = df[\"ports\"].apply(lambda x: str(x[0]) if isinstance(x, list) and x else None)\n", "\n", " df.loc[:, ~df.columns.isin([\"updatedAt\", \"createdAt\", \"documentUpdatedAt\"])] = \\\n", " df.loc[:, ~df.columns.isin([\"updatedAt\", \"createdAt\", \"documentUpdatedAt\"])].astype(str)\n", "\n", " for col in columns:\n", " if col not in df.columns:\n", " df[col] = None\n", "\n", " df[\"pl_power\"] = df['pl_power'].replace('nan', None)\n", " df[\"pl_Humidity\"] = df['pl_Humidity'].replace('nan', None)\n", " df[\"pl_Tempc\"] = df['pl_Tempc'].replace('nan', None)\n", " df[\"pl_room_temp\"] = df['pl_room_temp'].replace('nan', None)\n", " df[\"pl_kwh_5min\"] = df['pl_kwh_5min'].replace('nan', None)\n", "\n", " df = df[columns]\n", " df.to_csv(output_file, mode='a', header=first_chunk, index=False)\n", " return True\n", "\n", " # Processing the data in chunks\n", " skip = 0\n", " first_chunk = True\n", " chunk_number = 0\n", "\n", " while True:\n", " if not process_and_append_to_csv(skip, first_chunk):\n", " break\n", "\n", " chunk_number += 1\n", " print(f\"Chunk {chunk_number} successfully appended.\")\n", " skip += batch_size\n", " first_chunk = False\n", "\n", " print(f\"Data has been written to {output_file}. Total chunks processed: {chunk_number}\")\n", "\n", " # Convert CSV to Parquet\n", " df = pl.scan_csv(\"dls.csv\", infer_schema=False)\n", " cols = df.collect_schema().names()\n", " print(f\"Columns count: {len(cols)}, Unique columns (case-insensitive): {len(set(x.lower() for x in cols))}\")\n", "\n", " df_final = df.with_columns(\n", " pl.col(\"createdAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"createdAt\"),\n", " pl.col(\"updatedAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"updatedAt\"),\n", " pl.col(\"documentUpdatedAt\").str.to_datetime(\"%Y-%m-%d %H:%M:%S%.f\", strict=False).alias(\"documentUpdatedAt\"),\n", " ).collect().write_parquet(\n", " \"dls_updated.parquet\",\n", " compression=\"snappy\",\n", " use_pyarrow=True,\n", " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", " )\n", "\n", " # Filter out null createdAt records\n", " df2 = pl.scan_parquet(\"dls_updated.parquet\")\n", " df2.filter(pl.col(\"createdAt\").is_not_null()).collect().write_parquet(\n", " \"dls_updated_final2.parquet\",\n", " compression=\"snappy\",\n", " use_pyarrow=True,\n", " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", " )\n", "\n", " # Verify data\n", " dls_df = pl.scan_parquet(\"dls_updated_final2.parquet\")\n", " min_date, max_date = dls_df.select(pl.min(\"updatedAt\")).collect(), dls_df.select(pl.max(\"updatedAt\")).collect()\n", " print(f\"Data range - Min: {min_date}, Max: {max_date}\")\n", " print(\"dls is ready\")\n", "\n", " # Final cleanup and upload preparation\n", " df = pd.read_parquet(\"dls_updated_final2.parquet\")\n", " df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']] = \\\n", " df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']].replace('nan', None)\n", " df = df.dropna(axis=1, how='all')\n", " df.to_parquet(\"dls_updated_final2.parquet\", use_deprecated_int96_timestamps=True)\n", "\n", " print(\"Data migration completed successfully!\")\n", "\n", " except Exception as e:\n", " print(f\"Error during data migration: {str(e)}\")\n", " raise\n", " finally:\n", " # Clean up resources\n", " if 'client' in locals():\n", " client.close()\n", " if 'conn_redshift' in locals():\n", " conn_redshift.close()\n", "\n", "\n", "if __name__ == \"__main__\":\n", " main()\n", "\n", "\n", "\n", "# ==== AWS + S3 Config ====\n", "aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'\n", "aws_secret_access_key = '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'\n", "region_name = 'ap-south-1'\n", "bucket_name = 'ankur-v'\n", "s3_output_key = 'dls.parquet'\n", "local_file_path = 'dls_updated_final2.parquet'\n", "s3_path = f\"s3://{bucket_name}/{s3_output_key}\"\n", "\n", "# ==== Redshift Config ====\n", "redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'\n", "redshift_db = 'dev'\n", "redshift_user = 'awsuser'\n", "redshift_password = 'LtRedshift-2024'\n", "redshift_port = 5439\n", "\n", "# ==== SQLAlchemy Config ====\n", "engine = create_engine(\n", " f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'\n", ")\n", "Session = sessionmaker(bind=engine)\n", "\n", "# ==== Step 1: Upload to S3 ====\n", "def upload_to_s3():\n", " print(\"🔼 Uploading file to S3...\")\n", " session = boto3.Session(\n", " aws_access_key_id=aws_access_key_id,\n", " aws_secret_access_key=aws_secret_access_key,\n", " region_name=region_name\n", " )\n", " s3 = session.resource('s3')\n", " s3.meta.client.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_output_key)\n", " print(\"✅ File uploaded to S3.\")\n", "\n", "# ==== Step 2: Copy to Redshift (dls_today) ====\n", "def copy_to_redshift():\n", " print(\"⬇️ Copying data into Redshift temporary table...\")\n", " conn = psycopg2.connect(\n", " host=redshift_host,\n", " port=redshift_port,\n", " database=redshift_db,\n", " user=redshift_user,\n", " password=redshift_password\n", " )\n", " cursor = conn.cursor()\n", " cursor.execute(\"truncate table lightson.dls_today;\")\n", " copy_query = f\"\"\"\n", " COPY lightson.dls_today\n", " FROM '{s3_path}'\n", " CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'\n", " FORMAT AS PARQUET;\n", " \"\"\"\n", " cursor.execute(copy_query)\n", " cursor.close()\n", " conn.close()\n", " print(\"✅ Copied data to dls_today.\")\n", "\n", "# ==== Step 3: Merge data (delete + insert) ====\n", "def merge_into_dls():\n", " print(\"🔄 Merging dls_today into dls...\")\n", " try:\n", " with Session() as session:\n", " # Delete overlapping records\n", " delete_result = session.execute(text(\"\"\"\n", " DELETE FROM lightson.dls\n", " WHERE _id IN (\n", " SELECT _id FROM lightson.dls\n", " INTERSECT\n", " SELECT _id FROM lightson.dls_today\n", " )\n", " \"\"\"))\n", " session.commit()\n", " print(f\"🗑️ Rows deleted from dls: {delete_result.rowcount}\")\n", "\n", " with Session() as session:\n", " # Insert new records\n", " insert_result = session.execute(text(\"\"\"\n", " INSERT INTO lightson.dls (\n", " _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, pl_f, pl_https_exp,\n", " pl_humidity, pl_lmf, pl_mo, pl_mqtts_exp, pl_multiple_flag, pl_ns,\n", " pl_online, did, cid, ports_0, updatedat, createdat, documentupdatedat,\n", " pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, pl_ol, pl_p, pl_pf,\n", " pl_ph_ang, pl_proto, pl_publish, pl_room_humid, pl_room_temp,\n", " pl_sens_pos, pl_single_flag, pl_t, pl_tempc, pl_time, pl_tot_run_hrs,\n", " pl_tz, pl_meter, pl_voltage, pl_current, pl_power, pl_freq, pl_kwh,\n", " pl_kwh_5min, pl_run_hrs_5min, pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp,\n", " pl_ac_hs, pl_ac_vs, pl_rssi\n", " )\n", " SELECT\n", " _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds,\n", " NULL AS pl_f, NULL AS pl_https_exp, pl_humidity,\n", " NULL AS pl_lmf, NULL AS pl_mo, NULL AS pl_mqtts_exp,\n", " NULL AS pl_multiple_flag, pl_ns, pl_online, did, cid,\n", " ports AS ports_0, updatedat, createdat, documentupdatedat,\n", " pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt,\n", " NULL AS pl_ol, NULL AS pl_p, pl_pf, pl_ph_ang,\n", " pl_proto, NULL AS pl_publish, pl_room_humid,\n", " pl_room_temp, pl_sens_pos, NULL AS pl_single_flag,\n", " NULL AS pl_t, pl_tempc, pl_time, pl_tot_run_hrs,\n", " NULL AS pl_tz, pl_meter, pl_voltage, pl_current,\n", " pl_power, pl_freq, pl_kwh, pl_kwh_5min, pl_run_hrs_5min,\n", " pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, pl_ac_hs,\n", " pl_ac_vs, pl_rssi\n", " FROM lightson.dls_today\n", " \"\"\"))\n", " session.commit()\n", " print(f\"✅ Rows inserted into dls: {insert_result.rowcount}\")\n", " except Exception as e:\n", " print(\"❌ Merge failed:\", e)\n", " traceback.print_exc()\n", "\n", "# ==== Main Runner ====\n", "if __name__ == \"__main__\":\n", " try:\n", " upload_to_s3()\n", " copy_to_redshift()\n", " merge_into_dls()\n", " print(\"🎉 Data pipeline complete.\")\n", " except Exception as e:\n", " print(\"🔥 Fatal error:\", e)\n", " traceback.print_exc()\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "#---------------------------------------------------------meter raw ------------------------------------------------------------\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "import pandas as pd\n", "import boto3\n", "import awswrangler as wr\n", "from datetime import datetime, timedelta\n", "from pymongo import MongoClient\n", "from pathlib import Path\n", "import os\n", "import polars as pl\n", "from pytz import timezone\n", "from math import ceil\n", "import time\n", "import psycopg2\n", "from sqlalchemy import create_engine, text\n", "from sqlalchemy.orm import sessionmaker\n", "import traceback\n", "\n", "# Constants\n", "IST = timezone('Asia/Kolkata')\n", "UTC = timezone('UTC')\n", "CHUNK_SIZE = 100000\n", "\n", "def meter_raw_migration():\n", " try:\n", " print(\"Starting meter_raw_migration process...\")\n", "\n", " # ==== Configuration ====\n", " # Define start_date (yesterday) and end_date (today) in UTC\n", " start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)\n", " end_date = datetime.utcnow()\n", "\n", " # AWS and Redshift configuration\n", " aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'\n", " aws_secret_access_key = 'LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh'\n", " region_name = 'ap-south-1'\n", " bucket_name = 'ankur-v'\n", " s3_output_key = 'meterraw.parquet'\n", " redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'\n", " redshift_db = 'dev'\n", " redshift_user = 'awsuser'\n", " redshift_password = 'LtRedshift-2024'\n", " redshift_port = 5439\n", "\n", " # ==== Data Processing ====\n", " # Create output directory\n", " root_path = Path.cwd() / \"meterRaw\" / end_date.strftime(\"%Y_%m_%d\")\n", " root_path.mkdir(parents=True, exist_ok=True)\n", "\n", " # Define schema and columns\n", " cols = [\n", " '_id', 'm', 'DS', 'MS', 'Power(W)', 'ac_power', 'angle', 'current_kwh',\n", " 'fan_speed', 'freq', 'mode', 'serialNo', 'temp', 'updatedAt', 'room_temp',\n", " 'room_humid', 'ac_hs', 'ac_vs', 'kwh_5min', 'v', 'timestamp', 'oem',\n", " 'Algo_flag', 'createdAt', 'current_phase_1', 'current_phase_2',\n", " 'current_phase_3', 'voltage_phase_1', 'voltage_phase_2', 'voltage_phase_3',\n", " 'power_factor_phase_1', 'power_factor_phase_2', 'power_factor_phase_3'\n", " ]\n", "\n", " schema = {\n", " \"_id\": \"string\",\n", " \"updatedAt\": \"datetime64[ns]\",\n", " \"createdAt\": \"datetime64[ns]\",\n", " \"m\": \"string\",\n", " \"current_phase_1\": \"Float64\",\n", " \"current_phase_2\": \"Float64\",\n", " \"current_phase_3\": \"Float64\",\n", " \"DS\": \"boolean\",\n", " \"MS\": \"boolean\",\n", " \"Power(W)\": \"Float64\",\n", " \"voltage_phase_1\": \"Float64\",\n", " \"voltage_phase_2\": \"Float64\",\n", " \"voltage_phase_3\": \"Float64\",\n", " \"ac_power\": \"Int64\",\n", " \"angle\": \"Float64\",\n", " \"current_kwh\": \"Float64\",\n", " \"fan_speed\": \"Int64\",\n", " \"freq\": \"Float64\",\n", " \"mode\": \"Int64\",\n", " \"power_factor_phase_1\": \"Float64\",\n", " \"power_factor_phase_2\": \"Float64\",\n", " \"power_factor_phase_3\": \"Float64\",\n", " \"serialNo\": \"string\",\n", " \"temp\": \"Float64\",\n", " \"room_temp\": \"Float64\",\n", " \"room_humid\": \"Float64\",\n", " \"ac_hs\": \"Int64\",\n", " \"ac_vs\": \"Int64\",\n", " \"kwh_5min\": \"Float64\",\n", " \"v\": \"string\",\n", " \"timestamp\": \"string\",\n", " \"oem\": \"string\",\n", " \"Algo_flag\": \"string\"\n", " }\n", "\n", " # MongoDB connection\n", " client = MongoClient(\"mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0\")\n", " db = client.get_database(\"lightson\")\n", " coll = db.get_collection(\"meterRaw\")\n", "\n", " # Query definitions\n", " query = {\n", " \"single\": {\n", " \"Current.0\": {\"$exists\": False}\n", " },\n", " \"three\": {\n", " \"Current.0\": {\"$exists\": True}\n", " }\n", " }\n", "\n", " # Helper function for chunk processing\n", " def get_chunk(rows: int, completed_count: int = 0, chunk_size=CHUNK_SIZE):\n", " chunks = ceil((rows - completed_count) / chunk_size)\n", " for i in range(chunks):\n", " skip = completed_count + (i * chunk_size)\n", " limit = min(rows - skip, chunk_size)\n", " yield skip, limit\n", "\n", " # ==== Process Single Phase Data ====\n", " print(\"Processing single phase data...\")\n", " query_single_phase = query[\"single\"].copy()\n", " query_single_phase.update({\n", " \"updatedAt\": {\n", " \"$gte\": start_date,\n", " \"$lte\": end_date\n", " }\n", " })\n", "\n", " count = coll.count_documents(query_single_phase)\n", " idx = 0\n", "\n", " for skip, limit in get_chunk(count):\n", " df = pd.DataFrame(list(coll.find(query_single_phase, skip=skip, limit=limit)))\n", " print(f\"Processing single phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}\")\n", "\n", " df[\"_id\"] = df[\"_id\"].astype(str)\n", " df = df.rename(columns={\n", " \"Current\": \"current_phase_1\",\n", " \"Voltage\": \"voltage_phase_1\",\n", " \"power_factor\": \"power_factor_phase_1\"\n", " }).reindex(columns=cols).astype(schema)\n", "\n", " df.to_parquet(root_path / f\"single_phase_idx_{idx}.parquet\", use_deprecated_int96_timestamps=True)\n", " idx += 1\n", "\n", " # ==== Process Three Phase Data ====\n", " print(\"Processing three phase data...\")\n", " query_three_phase = query[\"three\"].copy()\n", " query_three_phase.update({\n", " \"updatedAt\": {\n", " \"$gte\": start_date,\n", " \"$lte\": end_date\n", " }\n", " })\n", "\n", " count = coll.count_documents(query_three_phase)\n", " idx = 0\n", "\n", " for skip, limit in get_chunk(count):\n", " df = pd.DataFrame(list(coll.find(query_three_phase, skip=skip, limit=limit)))\n", " print(f\"Processing three phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}\")\n", "\n", " # Expand array columns\n", " for key in (\"Current\", \"Voltage\", \"power_factor\"):\n", " array_col = df.pop(key)\n", " df[[f\"{key.lower()}_phase_1\", f\"{key.lower()}_phase_2\", f\"{key.lower()}_phase_3\"]] = array_col.to_list()\n", "\n", " df[\"_id\"] = df[\"_id\"].astype(str)\n", " df = df.reindex(columns=cols).astype(schema)\n", " df.to_parquet(root_path / f\"three_phase_idx_{idx}.parquet\", use_deprecated_int96_timestamps=True)\n", " idx += 1\n", "\n", " # ==== Combine and Finalize Data ====\n", " print(\"Combining all data files...\")\n", " df = pl.scan_parquet(root_path / \"*.parquet\")\n", " final_filename = f\"meterRaw_{end_date.strftime('%Y_%m_%d')}.parquet\"\n", " df.collect().write_parquet(\n", " Path.cwd() / final_filename,\n", " compression=\"snappy\",\n", " use_pyarrow=True,\n", " pyarrow_options={\"use_deprecated_int96_timestamps\": True}\n", " )\n", "\n", " # Clean up temporary files\n", " for children in root_path.glob(\"*.parquet\"):\n", " children.unlink(missing_ok=True)\n", " root_path.rmdir()\n", "\n", " # Verify final file\n", " df2 = pl.scan_parquet(Path.cwd() / final_filename)\n", " date_range = df2.select([\n", " pl.col(\"createdAt\").min().alias(\"min\"),\n", " pl.col(\"createdAt\").max().alias(\"max\")\n", " ]).collect()\n", " print(f\"Data range in final file: {date_range}\")\n", " print(\"meterraw_is_generated\")\n", "\n", " # ==== Upload to S3 ====\n", " print(\"🔼 Uploading MeterRaw to S3...\")\n", " session = boto3.Session(\n", " aws_access_key_id=aws_access_key_id,\n", " aws_secret_access_key=aws_secret_access_key,\n", " region_name=region_name\n", " )\n", " s3 = session.resource('s3')\n", " s3.meta.client.upload_file(\n", " Filename=final_filename,\n", " Bucket=bucket_name,\n", " Key=s3_output_key\n", " )\n", " print(\"✅ File uploaded to S3.\")\n", "\n", " # ==== Redshift Operations ====\n", " # Initialize SQLAlchemy engine\n", " engine = create_engine(\n", " f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'\n", " )\n", " Session = sessionmaker(bind=engine)\n", "\n", " # ==== Copy to Redshift (meter_today) ====\n", " print(\"⬇️ Copying data into Redshift meter_today...\")\n", " s3_path = f\"s3://{bucket_name}/{s3_output_key}\"\n", "\n", "\n", "\n", " conn = psycopg2.connect(\n", " host=redshift_host,\n", " port=redshift_port,\n", " database=redshift_db,\n", " user=redshift_user,\n", " password=redshift_password)\n", "\n", "\n", "\n", "\n", " cursor = conn.cursor()\n", " cursor.execute(\"TRUNCATE TABLE lightson.meter_today;\")\n", " cursor.execute(f\"\"\"COPY lightson.meter_today FROM '{s3_path}' CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' FORMAT AS PARQUET;\"\"\")\n", " conn.commit()\n", " print(\"✅ Copied data to meter_today.\")\n", "\n", " # ==== Merge into meterraw ====\n", " print(\"🔄 Merging meter_today into meterraw...\")\n", " try:\n", " with Session() as session:\n", " # Delete duplicates\n", " delete_result = session.execute(text(\"\"\"\n", " DELETE FROM lightson.meterraw\n", " WHERE _id IN (\n", " SELECT _id FROM lightson.meterraw\n", " INTERSECT\n", " SELECT _id FROM lightson.meter_today\n", " )\n", " \"\"\"))\n", " session.commit()\n", " print(f\"🗑️ Deleted rows from meterraw: {delete_result.rowcount}\")\n", "\n", " # Insert new records\n", " insert_result = session.execute(text(\"\"\"\n", " INSERT INTO lightson.meterraw (\n", " _id, m, ds, ms, power_w_, ac_power, angle, current_kwh, fan_speed,\n", " freq, mode, serialno, temp, updatedat, createdat, current_phase_1,\n", " current_phase_2, current_phase_3, voltage_phase_1, voltage_phase_2,\n", " voltage_phase_3, power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,\n", " room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, \"timestamp\", oem, algo_flag\n", " )\n", " SELECT\n", " _id, m,\n", " CAST(ds AS BOOLEAN),\n", " CAST(ms AS BOOLEAN),\n", " \"power(w)\" AS power_w_,\n", " ac_power, angle, current_kwh, fan_speed, freq, mode, serialno, temp,\n", " updatedat, createdat, current_phase_1, current_phase_2, current_phase_3,\n", " voltage_phase_1, voltage_phase_2, voltage_phase_3,\n", " power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,\n", " room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, \"timestamp\", oem, algo_flag\n", " FROM lightson.meter_today\n", " \"\"\"))\n", " session.commit()\n", " print(f\"✅ Inserted rows into meterraw: {insert_result.rowcount}\")\n", "\n", " except Exception as e:\n", " print(\"❌ Merge failed:\", e)\n", " traceback.print_exc()\n", " raise\n", "\n", " print(\"🎉 MeterRaw pipeline complete.\")\n", "\n", " except Exception as e:\n", " print(\"🔥 Fatal error in pipeline:\", e)\n", " traceback.print_exc()\n", " raise\n", " finally:\n", " # Clean up resources\n", " if 'client' in locals():\n", " client.close()\n", " if 'engine' in locals():\n", " engine.dispose()\n", " print(\"Resources cleaned up.\")\n", "\n", "if __name__ == \"__main__\":\n", " meter_raw_migration()\n" ] }, { "cell_type": "code", "source": [], "metadata": { "id": "0qJ5OXpnBdzk" }, "execution_count": null, "outputs": [] } ] }