968 lines
38 KiB
Plaintext
968 lines
38 KiB
Plaintext
!pip install sqlalchemy
|
|
!pip install redshift-connector
|
|
!pip install --upgrade sqlalchemy-redshift
|
|
!pip install polars
|
|
!pip install timedelta
|
|
|
|
import time
|
|
import logging
|
|
import traceback
|
|
from math import ceil
|
|
from os import getenv
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import polars as pl
|
|
import pytz
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import boto3
|
|
import awswrangler as wr
|
|
import redshift_connector
|
|
import psycopg2
|
|
|
|
from pymongo import MongoClient, errors
|
|
from pymongo.database import Database
|
|
from pymongo.collection import Collection
|
|
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
|
|
|
|
|
|
|
|
IST = pytz.timezone('Asia/Kolkata')
|
|
now = datetime.now(IST)
|
|
print(now.strftime('%Y-%m-%d %H:%M:%S'))
|
|
|
|
|
|
|
|
UTC = pytz.UTC # or: pytz.timezone('UTC')
|
|
now = datetime.now(UTC)
|
|
print(now)
|
|
|
|
|
|
|
|
|
|
#----------------------port forwarding ---------------------
|
|
|
|
|
|
import subprocess
|
|
import socket
|
|
import time
|
|
|
|
def is_port_in_use(port):
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
return s.connect_ex(('localhost', port)) == 0
|
|
|
|
def start_port_forward():
|
|
try:
|
|
print("Starting port-forward...")
|
|
# You can add full path to ./kubectl if needed
|
|
subprocess.Popen(
|
|
['./kubectl', 'port-forward', '--namespace', 'mongo', 'mongo-db-dr-0', '9001:27017'],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
)
|
|
print("Port-forward command issued.")
|
|
except Exception as e:
|
|
print(f"Error starting port-forward: {e}")
|
|
|
|
def ensure_port_forward():
|
|
if is_port_in_use(9001):
|
|
print("Port 9001 is already in use — no need to start port-forward.")
|
|
else:
|
|
print("Port 9001 is NOT in use — starting port-forward.")
|
|
start_port_forward()
|
|
time.sleep(5) # Wait a few seconds to let it establish
|
|
if is_port_in_use(9001):
|
|
print("Port-forward established successfully")
|
|
else:
|
|
print("Failed to establish port-forward ")
|
|
|
|
if __name__ == "__main__":
|
|
ensure_port_forward()
|
|
|
|
|
|
|
|
|
|
|
|
#-----------------------------data migration start-----------------------
|
|
|
|
|
|
|
|
|
|
|
|
# Configure logging to print instead of writing to a file
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
def data_migration(client):
|
|
"""Fetch, process, and store data from MongoDB."""
|
|
try:
|
|
db = client["lightson"]
|
|
logging.info("Connected to MongoDB successfully.")
|
|
print(" Connection successfully established.")
|
|
|
|
|
|
|
|
|
|
# New_Energy_Saving Data
|
|
collection = db['new_energySaving']
|
|
end_date = datetime.today() - timedelta(days=1)
|
|
# Ensure all required columns exist
|
|
required_columns = ["_id", "serialNo", "devices", "mitigated_co2","et" , "total_kwh", "deviceMapID", "weather_min_temp", "weather_max_temp", "billing_amount",
|
|
"updatedAt", "ac_fan_hrs", "ac_run_hrs", "algo_status", "clientID", "cost_reduction", "createdAt", "energy_savings_ref_kwh",
|
|
"energy_savings_us_meter", "energy_savings_inv_factor", "energy_savings_us_calc", "energy_savings_savings_percent"]
|
|
|
|
cursor = collection.find({"createdAt": {"$lt": end_date},},
|
|
{"_id": 1, "serialNo": 1, "devices": 1, "mitigated_co2": 1, "total_kwh": 1, "deviceMapID": 1, "weather": 1,
|
|
"billing_amount": 1, "updatedAt": 1, "ac_fan_hrs": 1, "ac_run_hrs": 1, "algo_status": 1,
|
|
"clientID": 1, "cost_reduction": 1, "createdAt": 1, "energy_savings": 1})
|
|
# Convert cursor to a list
|
|
data = list(cursor)
|
|
if not data:
|
|
print("No data found in the specified date range.")
|
|
else:
|
|
df_nes = pd.json_normalize(data, sep='_')
|
|
df_nes = df_nes.explode("devices")
|
|
# Convert ObjectId fields to strings
|
|
|
|
for col in ['_id', 'clientID', 'deviceMapID', 'devices', "serialNo"]:
|
|
if col in df_nes.columns:
|
|
df_nes[col] = df_nes[col].astype(str)
|
|
|
|
# Convert numerical fields properly
|
|
|
|
for col in ["mitigated_co2", "energy_savings_ref_kwh", "energy_savings_savings_percent"]:
|
|
if col in df_nes.columns:
|
|
df_nes[col] = pd.to_numeric(df_nes[col], errors="coerce")
|
|
|
|
|
|
for col in required_columns:
|
|
if col not in df_nes.columns:
|
|
df_nes[col] = None
|
|
|
|
df_nes = df_nes[required_columns]
|
|
|
|
df_nes.drop_duplicates(subset='_id')
|
|
df_nes["et"] = df_nes["et"].fillna("")
|
|
df_nes.rename(columns={'weather_min_temp': 'min_temp','weather_max_temp': 'max_temp','energy_savings_ref_kwh': 'ref_kwh',
|
|
'energy_savings_us_meter': 'us_meter','energy_savings_inv_factor': 'inv_factor',
|
|
'energy_savings_us_calc': 'us_calc','energy_savings_savings_percent': 'savings_percent'}, inplace=True)
|
|
# df_nes["et"] = df_nes["et"].fillna("0").astype(str)
|
|
df_nes.to_parquet("New_Energy_Saving.parquet", use_deprecated_int96_timestamps=True, index=False, engine='pyarrow')
|
|
df_nes = pd.read_parquet("New_Energy_Saving.parquet")
|
|
print(df_nes.head(3))
|
|
print("New_Energy_Saving.parquet has been successfully saved.")
|
|
|
|
# Devices Data
|
|
df_devices = pd.DataFrame(db.devices.find())
|
|
if df_devices.empty:
|
|
logging.warning("No data found in 'devices' collection.")
|
|
else:
|
|
df_devices["_id"] = df_devices["_id"].astype(str)
|
|
df_devices["assignedTo"] = df_devices["assignedTo"].astype(str)
|
|
if "ports" in df_devices.columns:
|
|
df_devices["ports"] = df_devices["ports"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)
|
|
df_devices = df_devices.explode("ports")
|
|
for column in df_devices.columns:
|
|
if column not in ["_id","__v","algorithm_type","assignedTo","AV1_ENABLE","AV1_PENDING_STATUS","AV2_ENABLE","AV2_PENDING_STATUS","configured",
|
|
"createdAt","deviceId","dType","IS_MAINTENANCE","IS_PREDICTIVE_MAINTENANCE","last_change","lastConfiguredAt","lastFactoryResetAt","live_algo_status",
|
|
"lp","macId","modelnumber","ns","online","OTAUpdatedAt","ports","proto_updatedAt","protocolname","status","temp","updatedAt","version",]:
|
|
df_devices.drop(columns=column, inplace=True)
|
|
df_devices.to_parquet("devices.parquet", use_deprecated_int96_timestamps = True, index = False)
|
|
df_devices = pd.read_parquet("devices.parquet")
|
|
df_devices = df_devices.drop_duplicates(subset=["_id"])
|
|
print("Devices Data:\n", df_devices.head(1)) # Debugging Output
|
|
|
|
# Ports Data
|
|
df_ports = pd.DataFrame(db.ports.find())
|
|
if df_ports.empty:
|
|
logging.warning("No data found in 'ports' collection.")
|
|
else:
|
|
df_ports = df_ports.astype({"_id": "string", "device": "string", "groupId": "string"}, errors="ignore")
|
|
df_ports = df_ports.astype("string", errors="ignore")
|
|
df_ports.to_parquet("ports.parquet", use_deprecated_int96_timestamps = True, index = False)
|
|
df_ports = pd.read_parquet("ports.parquet")
|
|
|
|
print("Ports Data:\n", df_ports.head(1))
|
|
|
|
# Clients Data
|
|
df_client = pd.DataFrame(db.clients.find())
|
|
if df_client.empty:
|
|
logging.warning("No data found in 'clients' collection.")
|
|
else:
|
|
df_client["_id"] = df_client["_id"].astype(str)
|
|
for col in ["users", "feature_configuration"]:
|
|
if col in df_client.columns:
|
|
df_client.drop(col, axis=1, inplace=True)
|
|
if "devices" in df_client.columns:
|
|
df_client["devices"] = df_client["devices"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)
|
|
df_client = df_client.explode("devices")
|
|
for column in df_client.columns:
|
|
if column not in ["_id","devices","n","e","m","addr","pin","city","state",
|
|
"country","createdAt","updatedAt","__v","ct"]:
|
|
df_client.drop(columns=column, inplace=True)
|
|
df_client.to_parquet("clients.parquet" , use_deprecated_int96_timestamps = True, index = False)
|
|
df_client = pd.read_parquet("clients.parquet")
|
|
print("Clients Data:\n", df_client.head(1)) # Debugging Output
|
|
|
|
# Device Map Data
|
|
df_deviceMap = pd.DataFrame(client.lightson.deviceMap.find())
|
|
if df_deviceMap.empty:
|
|
logging.warning("No data found in 'deviceMap' collection.")
|
|
else:
|
|
df_deviceMap["_id"] = df_deviceMap["_id"].astype(str)
|
|
df_deviceMap["ac_age"] = df_deviceMap["ac_age"].astype(str)
|
|
if "clientID" in df_deviceMap.columns:
|
|
df_deviceMap["clientID"] = df_deviceMap["clientID"].astype(str)
|
|
if "devices" in df_deviceMap.columns:
|
|
df_deviceMap["devices"] = df_deviceMap["devices"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)
|
|
df_deviceMap = df_deviceMap.explode("devices")
|
|
if "facilityId" in df_deviceMap.columns:
|
|
df_deviceMap["facilityId"] = df_deviceMap["facilityId"].astype(str)
|
|
df_devicemap2 = df_deviceMap.copy(deep = True)
|
|
df_deviceMap = pd.concat([df_devicemap2.drop("toggle_threshold", axis = 1), pd.json_normalize(df_devicemap2["toggle_threshold"], sep = "_").add_prefix("toggle_threshold_")])
|
|
for column in df_deviceMap.columns:
|
|
if column not in ["_id","__v","ac_age","ac_brand","ac_model_number","ac_ton","ac_type","atm_id","auto_params_calculation","city","client_type","clientID","cool_down",
|
|
"createdAt","current_kwh","current_threshold","devices","Efan","fan_consumption","fan_off_equal","fan_threshold","fm_name","fromdate","installation_date",
|
|
"latest_ES_Date","Meter","mode_threshold","facilityId","model","NAC","perUnitCost","phase","power_correction","ref_kwh","run_hrs","sched_lock",
|
|
"serialNo","SIM_number","site","site_category","site_status","temp_threshold","todate","total_real_energy","updatedAt","sched_lock",
|
|
"sched_executed_at","sched_executed_pwr","sched_imminent","sched_last_mismatch_at","sched_corrected_at","star_rating","toggle_threshold_on2off",
|
|
"toggle_threshold_off2on","toggle_threshold_notfan2fan","toggle_threshold_fan2notfan"
|
|
]:
|
|
df_deviceMap.drop(columns=column, inplace=True)
|
|
df_deviceMap.to_parquet("deviceMap.parquet", use_deprecated_int96_timestamps = True, index = False)
|
|
df_deviceMap = pd.read_parquet("deviceMap.parquet")
|
|
df_deviceMap = pd.read_parquet("deviceMap.parquet").drop_duplicates(subset=['_id'], keep='first').reset_index(drop=True)
|
|
|
|
print("Device Map Data:\n", df_deviceMap.head(1))
|
|
|
|
|
|
# daily_dump
|
|
|
|
df_dump = pd.read_csv(f"https://docs.google.com/spreadsheets/d/{'16CfL8N28-2nqUxWkbRGQVKlGViW0SoGlbsX4cUIzxKY'}/export?format=csv")
|
|
df_dump.columns = df_dump.columns.str.replace(' ', '_')
|
|
df_dump = df_dump.astype("string", errors="ignore")
|
|
df_dump["Date"] = pd.to_datetime(df_dump["Date"], format = "%d/%m/%Y", errors="coerce")
|
|
df_dump.to_parquet("daily_dump.parquet", index = False, use_deprecated_int96_timestamps=True)
|
|
df_dump = pd.read_parquet("daily_dump.parquet")
|
|
|
|
|
|
|
|
|
|
|
|
df_deviceMap = pd.read_parquet("deviceMap.parquet")
|
|
df_clients = pd.read_parquet("clients.parquet")
|
|
df_ports = pd.read_parquet("ports.parquet")
|
|
df_nes = pd.read_parquet("New_Energy_Saving.parquet")
|
|
df_devices = pd.read_parquet("devices.parquet")
|
|
df_dump = pd.read_parquet("daily_dump.parquet")
|
|
# Correct dictionary (keys should be strings, not DataFrames)
|
|
dfs = {"devicemap": df_deviceMap,"clients": df_clients,"ports": df_ports,"New_Energy_Saving": df_nes,"devices": df_devices ,"daily_dump":df_dump}
|
|
|
|
session = boto3.Session(
|
|
aws_access_key_id = "AKIARXCGNNYDRHFBBB6I",
|
|
aws_secret_access_key = "LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh",
|
|
region_name="ap-south-1")
|
|
|
|
s3_path = "s3://daily-dump-bucket/daily_dump"
|
|
|
|
|
|
conn_redshift = wr.redshift.connect(secret_id="sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb", boto3_session=session)
|
|
|
|
# Loop through DataFrames and print their sizes
|
|
dfs = {"devicemap": df_deviceMap,"clients": df_clients,"ports": df_ports,"new_energy_saving": df_nes,"devices": df_devices ,"daily_dump": df_dump}
|
|
for name, df in dfs.items():
|
|
wr.redshift.copy(df=df,
|
|
path= s3_path,
|
|
con=conn_redshift,
|
|
schema="lightson",
|
|
table=name,
|
|
mode="overwrite",
|
|
varchar_lengths_default=65535,
|
|
boto3_session=session)
|
|
time.sleep(5) # Delay to prevent rapid successive writes
|
|
|
|
|
|
logging.info("Data migration completed successfully.")
|
|
return df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump
|
|
|
|
|
|
wr.redshift.copy(df=df_dump,
|
|
path= s3_path,
|
|
con=conn_redshift,
|
|
schema="lightson",
|
|
table=daily_dump_historical,
|
|
mode="append",
|
|
varchar_lengths_default=65535,
|
|
boto3_session=session)
|
|
|
|
except errors.ServerSelectionTimeoutError:
|
|
logging.error(" MongoDB connection failed! Is the server running?")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f" Unexpected error: {e}")
|
|
return None
|
|
|
|
|
|
# Example usage
|
|
try:
|
|
client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=5000")
|
|
logging.info("MongoDB client created successfully.")
|
|
|
|
df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump = data_migration(client)
|
|
|
|
# Final check if data was retrieved
|
|
if df_nes is not None:
|
|
print("\n Migration completed successfully.")
|
|
else:
|
|
print("\n Migration failed. Check logs for details.")
|
|
except Exception as e:
|
|
logging.error(f" Fatal error in script: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#---------------------------------------------dls migration ------------------------------
|
|
|
|
|
|
import pandas as pd
|
|
import boto3
|
|
import awswrangler as wr
|
|
from datetime import datetime, timedelta
|
|
from pymongo import MongoClient
|
|
from pathlib import Path
|
|
import os
|
|
import polars as pl
|
|
from pytz import timezone
|
|
|
|
# Constants
|
|
UTC = timezone('UTC')
|
|
IST = timezone('Asia/Kolkata')
|
|
|
|
def main():
|
|
try:
|
|
# Initialize connections and settings
|
|
client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0")
|
|
db = client['lightson']
|
|
collection = db['dls']
|
|
|
|
s3_path = "s3://daily-dump-bucket/ankur-v"
|
|
session = boto3.Session(
|
|
aws_access_key_id="AKIARXCGNNYDRHFBBB6I",
|
|
aws_secret_access_key="LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh",
|
|
region_name="ap-south-1"
|
|
)
|
|
|
|
conn_redshift = wr.redshift.connect(secret_id="sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb", boto3_session=session)
|
|
|
|
# Clean up previous file if exists
|
|
Path("dls.csv").unlink(missing_ok=True)
|
|
|
|
# Batch size for processing
|
|
batch_size = 200000
|
|
output_file = "dls.csv"
|
|
|
|
# Get yesterday's date in UTC (12:00 AM IST)
|
|
start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)
|
|
|
|
# Define the expected columns (schema) for the CSV file
|
|
columns = [
|
|
"_id", "__v", "m", "t", "pl_m", "pl_v", "pl_Algo_flag", "pl_pir", "pl_DS", "pl_F",
|
|
"pl_https_exp", "pl_Humidity", "pl_LMF", "pl_Mo", "pl_mqtts_exp", "pl_multiple_flag",
|
|
"pl_NS", "pl_online", "dId", "cId", "ports", "updatedAt", "createdAt",
|
|
"documentUpdatedAt", "pl_tz_set", "pl_timestamp", "pl_oem",
|
|
"pl_off_pkt", "pl_OL", "pl_P", "pl_pf", "pl_ph_ang", "pl_proto", "pl_publish",
|
|
"pl_room_humid", "pl_room_temp", "pl_sens_pos", "pl_single_flag", "pl_T",
|
|
"pl_Tempc", "pl_Time", "pl_tot_run_hrs", "pl_tz", "pl_meter", "pl_voltage",
|
|
"pl_current", "pl_power", "pl_freq", "pl_kwh", "pl_kwh_5min",
|
|
"pl_run_hrs_5min", "pl_ac_p", "pl_ac_mo", "pl_ac_fs", "pl_ac_temp",
|
|
"pl_ac_hs", "pl_ac_vs", "pl_rssi"
|
|
]
|
|
|
|
# Function to process and append chunks to CSV
|
|
def process_and_append_to_csv(skip, first_chunk):
|
|
cursor = collection.find(
|
|
{
|
|
"t": "p",
|
|
"createdAt": {"$gte": start_date},
|
|
},
|
|
{"_id": 1, "__v": 1, "m": 1, "t": 1, "pl": 1, "dId": 1, "cId": 1, "pVal": 1,
|
|
"ports": 1, "updatedAt": 1, "createdAt": 1, "documentUpdatedAt": 1}
|
|
).skip(skip).limit(batch_size)
|
|
|
|
data = list(cursor)
|
|
if not data:
|
|
return False
|
|
|
|
df = pd.json_normalize(data, sep="_")
|
|
print(df["updatedAt"].min(), df["updatedAt"].max())
|
|
|
|
if "ports" in df.columns:
|
|
df["ports"] = df["ports"].apply(lambda x: str(x[0]) if isinstance(x, list) and x else None)
|
|
|
|
df.loc[:, ~df.columns.isin(["updatedAt", "createdAt", "documentUpdatedAt"])] = \
|
|
df.loc[:, ~df.columns.isin(["updatedAt", "createdAt", "documentUpdatedAt"])].astype(str)
|
|
|
|
for col in columns:
|
|
if col not in df.columns:
|
|
df[col] = None
|
|
|
|
df["pl_power"] = df['pl_power'].replace('nan', None)
|
|
df["pl_Humidity"] = df['pl_Humidity'].replace('nan', None)
|
|
df["pl_Tempc"] = df['pl_Tempc'].replace('nan', None)
|
|
df["pl_room_temp"] = df['pl_room_temp'].replace('nan', None)
|
|
df["pl_kwh_5min"] = df['pl_kwh_5min'].replace('nan', None)
|
|
|
|
df = df[columns]
|
|
df.to_csv(output_file, mode='a', header=first_chunk, index=False)
|
|
return True
|
|
|
|
# Processing the data in chunks
|
|
skip = 0
|
|
first_chunk = True
|
|
chunk_number = 0
|
|
|
|
while True:
|
|
if not process_and_append_to_csv(skip, first_chunk):
|
|
break
|
|
|
|
chunk_number += 1
|
|
print(f"Chunk {chunk_number} successfully appended.")
|
|
skip += batch_size
|
|
first_chunk = False
|
|
|
|
print(f"Data has been written to {output_file}. Total chunks processed: {chunk_number}")
|
|
|
|
# Convert CSV to Parquet
|
|
df = pl.scan_csv("dls.csv", infer_schema=False)
|
|
cols = df.collect_schema().names()
|
|
print(f"Columns count: {len(cols)}, Unique columns (case-insensitive): {len(set(x.lower() for x in cols))}")
|
|
|
|
df_final = df.with_columns(
|
|
pl.col("createdAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("createdAt"),
|
|
pl.col("updatedAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("updatedAt"),
|
|
pl.col("documentUpdatedAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("documentUpdatedAt"),
|
|
).collect().write_parquet(
|
|
"dls_updated.parquet",
|
|
compression="snappy",
|
|
use_pyarrow=True,
|
|
pyarrow_options={"use_deprecated_int96_timestamps": True}
|
|
)
|
|
|
|
# Filter out null createdAt records
|
|
df2 = pl.scan_parquet("dls_updated.parquet")
|
|
df2.filter(pl.col("createdAt").is_not_null()).collect().write_parquet(
|
|
"dls_updated_final2.parquet",
|
|
compression="snappy",
|
|
use_pyarrow=True,
|
|
pyarrow_options={"use_deprecated_int96_timestamps": True}
|
|
)
|
|
|
|
# Verify data
|
|
dls_df = pl.scan_parquet("dls_updated_final2.parquet")
|
|
min_date, max_date = dls_df.select(pl.min("updatedAt")).collect(), dls_df.select(pl.max("updatedAt")).collect()
|
|
print(f"Data range - Min: {min_date}, Max: {max_date}")
|
|
print("dls is ready")
|
|
|
|
# Final cleanup and upload preparation
|
|
df = pd.read_parquet("dls_updated_final2.parquet")
|
|
df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']] = \
|
|
df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']].replace('nan', None)
|
|
df = df.dropna(axis=1, how='all')
|
|
df.to_parquet("dls_updated_final2.parquet", use_deprecated_int96_timestamps=True)
|
|
|
|
print("Data migration completed successfully!")
|
|
|
|
except Exception as e:
|
|
print(f"Error during data migration: {str(e)}")
|
|
raise
|
|
finally:
|
|
# Clean up resources
|
|
if 'client' in locals():
|
|
client.close()
|
|
if 'conn_redshift' in locals():
|
|
conn_redshift.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
|
|
# ==== AWS + S3 Config ====
|
|
aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'
|
|
aws_secret_access_key = 'LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh'
|
|
region_name = 'ap-south-1'
|
|
bucket_name = 'ankur-v'
|
|
s3_output_key = 'dls.parquet'
|
|
local_file_path = 'dls_updated_final2.parquet'
|
|
s3_path = f"s3://{bucket_name}/{s3_output_key}"
|
|
|
|
# ==== Redshift Config ====
|
|
redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'
|
|
redshift_db = 'dev'
|
|
redshift_user = 'awsuser'
|
|
redshift_password = 'LtRedshift-2024'
|
|
redshift_port = 5439
|
|
|
|
# ==== SQLAlchemy Config ====
|
|
engine = create_engine(
|
|
f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'
|
|
)
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
# ==== Step 1: Upload to S3 ====
|
|
def upload_to_s3():
|
|
print("🔼 Uploading file to S3...")
|
|
session = boto3.Session(
|
|
aws_access_key_id=aws_access_key_id,
|
|
aws_secret_access_key=aws_secret_access_key,
|
|
region_name=region_name
|
|
)
|
|
s3 = session.resource('s3')
|
|
s3.meta.client.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_output_key)
|
|
print("✅ File uploaded to S3.")
|
|
|
|
# ==== Step 2: Copy to Redshift (dls_today) ====
|
|
def copy_to_redshift():
|
|
print("⬇️ Copying data into Redshift temporary table...")
|
|
conn = psycopg2.connect(
|
|
host=redshift_host,
|
|
port=redshift_port,
|
|
database=redshift_db,
|
|
user=redshift_user,
|
|
password=redshift_password
|
|
)
|
|
cursor = conn.cursor()
|
|
cursor.execute("truncate table lightson.dls_today;")
|
|
copy_query = f"""
|
|
COPY lightson.dls_today
|
|
FROM '{s3_path}'
|
|
CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'
|
|
FORMAT AS PARQUET;
|
|
"""
|
|
cursor.execute(copy_query)
|
|
cursor.close()
|
|
conn.close()
|
|
print("✅ Copied data to dls_today.")
|
|
|
|
# ==== Step 3: Merge data (delete + insert) ====
|
|
def merge_into_dls():
|
|
print("🔄 Merging dls_today into dls...")
|
|
try:
|
|
with Session() as session:
|
|
# Delete overlapping records
|
|
delete_result = session.execute(text("""
|
|
DELETE FROM lightson.dls
|
|
WHERE _id IN (
|
|
SELECT _id FROM lightson.dls
|
|
INTERSECT
|
|
SELECT _id FROM lightson.dls_today
|
|
)
|
|
"""))
|
|
session.commit()
|
|
print(f"🗑️ Rows deleted from dls: {delete_result.rowcount}")
|
|
|
|
with Session() as session:
|
|
# Insert new records
|
|
insert_result = session.execute(text("""
|
|
INSERT INTO lightson.dls (
|
|
_id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, pl_f, pl_https_exp,
|
|
pl_humidity, pl_lmf, pl_mo, pl_mqtts_exp, pl_multiple_flag, pl_ns,
|
|
pl_online, did, cid, ports_0, updatedat, createdat, documentupdatedat,
|
|
pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, pl_ol, pl_p, pl_pf,
|
|
pl_ph_ang, pl_proto, pl_publish, pl_room_humid, pl_room_temp,
|
|
pl_sens_pos, pl_single_flag, pl_t, pl_tempc, pl_time, pl_tot_run_hrs,
|
|
pl_tz, pl_meter, pl_voltage, pl_current, pl_power, pl_freq, pl_kwh,
|
|
pl_kwh_5min, pl_run_hrs_5min, pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp,
|
|
pl_ac_hs, pl_ac_vs, pl_rssi
|
|
)
|
|
SELECT
|
|
_id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds,
|
|
NULL AS pl_f, NULL AS pl_https_exp, pl_humidity,
|
|
NULL AS pl_lmf, NULL AS pl_mo, NULL AS pl_mqtts_exp,
|
|
NULL AS pl_multiple_flag, pl_ns, pl_online, did, cid,
|
|
ports AS ports_0, updatedat, createdat, documentupdatedat,
|
|
pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt,
|
|
NULL AS pl_ol, NULL AS pl_p, pl_pf, pl_ph_ang,
|
|
pl_proto, NULL AS pl_publish, pl_room_humid,
|
|
pl_room_temp, pl_sens_pos, NULL AS pl_single_flag,
|
|
NULL AS pl_t, pl_tempc, pl_time, pl_tot_run_hrs,
|
|
NULL AS pl_tz, pl_meter, pl_voltage, pl_current,
|
|
pl_power, pl_freq, pl_kwh, pl_kwh_5min, pl_run_hrs_5min,
|
|
pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, pl_ac_hs,
|
|
pl_ac_vs, pl_rssi
|
|
FROM lightson.dls_today
|
|
"""))
|
|
session.commit()
|
|
print(f"✅ Rows inserted into dls: {insert_result.rowcount}")
|
|
except Exception as e:
|
|
print("❌ Merge failed:", e)
|
|
traceback.print_exc()
|
|
|
|
# ==== Main Runner ====
|
|
if __name__ == "__main__":
|
|
try:
|
|
upload_to_s3()
|
|
copy_to_redshift()
|
|
merge_into_dls()
|
|
print("🎉 Data pipeline complete.")
|
|
except Exception as e:
|
|
print("🔥 Fatal error:", e)
|
|
traceback.print_exc()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#---------------------------------------------------------meter raw ------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import pandas as pd
|
|
import boto3
|
|
import awswrangler as wr
|
|
from datetime import datetime, timedelta
|
|
from pymongo import MongoClient
|
|
from pathlib import Path
|
|
import os
|
|
import polars as pl
|
|
from pytz import timezone
|
|
from math import ceil
|
|
import time
|
|
import psycopg2
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
import traceback
|
|
|
|
# Constants
|
|
IST = timezone('Asia/Kolkata')
|
|
UTC = timezone('UTC')
|
|
CHUNK_SIZE = 100000
|
|
|
|
def meter_raw_migration():
|
|
try:
|
|
print("Starting meter_raw_migration process...")
|
|
|
|
# ==== Configuration ====
|
|
# Define start_date (yesterday) and end_date (today) in UTC
|
|
start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)
|
|
end_date = datetime.utcnow()
|
|
|
|
# AWS and Redshift configuration
|
|
aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'
|
|
aws_secret_access_key = 'LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh'
|
|
region_name = 'ap-south-1'
|
|
bucket_name = 'ankur-v'
|
|
s3_output_key = 'meterraw.parquet'
|
|
redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'
|
|
redshift_db = 'dev'
|
|
redshift_user = 'awsuser'
|
|
redshift_password = 'LtRedshift-2024'
|
|
redshift_port = 5439
|
|
|
|
# ==== Data Processing ====
|
|
# Create output directory
|
|
root_path = Path.cwd() / "meterRaw" / end_date.strftime("%Y_%m_%d")
|
|
root_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Define schema and columns
|
|
cols = [
|
|
'_id', 'm', 'DS', 'MS', 'Power(W)', 'ac_power', 'angle', 'current_kwh',
|
|
'fan_speed', 'freq', 'mode', 'serialNo', 'temp', 'updatedAt', 'room_temp',
|
|
'room_humid', 'ac_hs', 'ac_vs', 'kwh_5min', 'v', 'timestamp', 'oem',
|
|
'Algo_flag', 'createdAt', 'current_phase_1', 'current_phase_2',
|
|
'current_phase_3', 'voltage_phase_1', 'voltage_phase_2', 'voltage_phase_3',
|
|
'power_factor_phase_1', 'power_factor_phase_2', 'power_factor_phase_3'
|
|
]
|
|
|
|
schema = {
|
|
"_id": "string",
|
|
"updatedAt": "datetime64[ns]",
|
|
"createdAt": "datetime64[ns]",
|
|
"m": "string",
|
|
"current_phase_1": "Float64",
|
|
"current_phase_2": "Float64",
|
|
"current_phase_3": "Float64",
|
|
"DS": "boolean",
|
|
"MS": "boolean",
|
|
"Power(W)": "Float64",
|
|
"voltage_phase_1": "Float64",
|
|
"voltage_phase_2": "Float64",
|
|
"voltage_phase_3": "Float64",
|
|
"ac_power": "Int64",
|
|
"angle": "Float64",
|
|
"current_kwh": "Float64",
|
|
"fan_speed": "Int64",
|
|
"freq": "Float64",
|
|
"mode": "Int64",
|
|
"power_factor_phase_1": "Float64",
|
|
"power_factor_phase_2": "Float64",
|
|
"power_factor_phase_3": "Float64",
|
|
"serialNo": "string",
|
|
"temp": "Float64",
|
|
"room_temp": "Float64",
|
|
"room_humid": "Float64",
|
|
"ac_hs": "Int64",
|
|
"ac_vs": "Int64",
|
|
"kwh_5min": "Float64",
|
|
"v": "string",
|
|
"timestamp": "string",
|
|
"oem": "string",
|
|
"Algo_flag": "string"
|
|
}
|
|
|
|
# MongoDB connection
|
|
client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0")
|
|
db = client.get_database("lightson")
|
|
coll = db.get_collection("meterRaw")
|
|
|
|
# Query definitions
|
|
query = {
|
|
"single": {
|
|
"Current.0": {"$exists": False}
|
|
},
|
|
"three": {
|
|
"Current.0": {"$exists": True}
|
|
}
|
|
}
|
|
|
|
# Helper function for chunk processing
|
|
def get_chunk(rows: int, completed_count: int = 0, chunk_size=CHUNK_SIZE):
|
|
chunks = ceil((rows - completed_count) / chunk_size)
|
|
for i in range(chunks):
|
|
skip = completed_count + (i * chunk_size)
|
|
limit = min(rows - skip, chunk_size)
|
|
yield skip, limit
|
|
|
|
# ==== Process Single Phase Data ====
|
|
print("Processing single phase data...")
|
|
query_single_phase = query["single"].copy()
|
|
query_single_phase.update({
|
|
"updatedAt": {
|
|
"$gte": start_date,
|
|
"$lte": end_date
|
|
}
|
|
})
|
|
|
|
count = coll.count_documents(query_single_phase)
|
|
idx = 0
|
|
|
|
for skip, limit in get_chunk(count):
|
|
df = pd.DataFrame(list(coll.find(query_single_phase, skip=skip, limit=limit)))
|
|
print(f"Processing single phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}")
|
|
|
|
df["_id"] = df["_id"].astype(str)
|
|
df = df.rename(columns={
|
|
"Current": "current_phase_1",
|
|
"Voltage": "voltage_phase_1",
|
|
"power_factor": "power_factor_phase_1"
|
|
}).reindex(columns=cols).astype(schema)
|
|
|
|
df.to_parquet(root_path / f"single_phase_idx_{idx}.parquet", use_deprecated_int96_timestamps=True)
|
|
idx += 1
|
|
|
|
# ==== Process Three Phase Data ====
|
|
print("Processing three phase data...")
|
|
query_three_phase = query["three"].copy()
|
|
query_three_phase.update({
|
|
"updatedAt": {
|
|
"$gte": start_date,
|
|
"$lte": end_date
|
|
}
|
|
})
|
|
|
|
count = coll.count_documents(query_three_phase)
|
|
idx = 0
|
|
|
|
for skip, limit in get_chunk(count):
|
|
df = pd.DataFrame(list(coll.find(query_three_phase, skip=skip, limit=limit)))
|
|
print(f"Processing three phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}")
|
|
|
|
# Expand array columns
|
|
for key in ("Current", "Voltage", "power_factor"):
|
|
array_col = df.pop(key)
|
|
df[[f"{key.lower()}_phase_1", f"{key.lower()}_phase_2", f"{key.lower()}_phase_3"]] = array_col.to_list()
|
|
|
|
df["_id"] = df["_id"].astype(str)
|
|
df = df.reindex(columns=cols).astype(schema)
|
|
df.to_parquet(root_path / f"three_phase_idx_{idx}.parquet", use_deprecated_int96_timestamps=True)
|
|
idx += 1
|
|
|
|
# ==== Combine and Finalize Data ====
|
|
print("Combining all data files...")
|
|
df = pl.scan_parquet(root_path / "*.parquet")
|
|
final_filename = f"meterRaw_{end_date.strftime('%Y_%m_%d')}.parquet"
|
|
df.collect().write_parquet(
|
|
Path.cwd() / final_filename,
|
|
compression="snappy",
|
|
use_pyarrow=True,
|
|
pyarrow_options={"use_deprecated_int96_timestamps": True}
|
|
)
|
|
|
|
# Clean up temporary files
|
|
for children in root_path.glob("*.parquet"):
|
|
children.unlink(missing_ok=True)
|
|
root_path.rmdir()
|
|
|
|
# Verify final file
|
|
df2 = pl.scan_parquet(Path.cwd() / final_filename)
|
|
date_range = df2.select([
|
|
pl.col("createdAt").min().alias("min"),
|
|
pl.col("createdAt").max().alias("max")
|
|
]).collect()
|
|
print(f"Data range in final file: {date_range}")
|
|
print("meterraw_is_generated")
|
|
|
|
# ==== Upload to S3 ====
|
|
print("🔼 Uploading MeterRaw to S3...")
|
|
session = boto3.Session(
|
|
aws_access_key_id=aws_access_key_id,
|
|
aws_secret_access_key=aws_secret_access_key,
|
|
region_name=region_name
|
|
)
|
|
s3 = session.resource('s3')
|
|
s3.meta.client.upload_file(
|
|
Filename=final_filename,
|
|
Bucket=bucket_name,
|
|
Key=s3_output_key
|
|
)
|
|
print("✅ File uploaded to S3.")
|
|
|
|
# ==== Redshift Operations ====
|
|
# Initialize SQLAlchemy engine
|
|
engine = create_engine(
|
|
f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'
|
|
)
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
# ==== Copy to Redshift (meter_today) ====
|
|
print("⬇️ Copying data into Redshift meter_today...")
|
|
s3_path = f"s3://{bucket_name}/{s3_output_key}"
|
|
|
|
|
|
|
|
conn = psycopg2.connect(
|
|
host=redshift_host,
|
|
port=redshift_port,
|
|
database=redshift_db,
|
|
user=redshift_user,
|
|
password=redshift_password)
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute("TRUNCATE TABLE lightson.meter_today;")
|
|
cursor.execute(f"""COPY lightson.meter_today FROM '{s3_path}' CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' FORMAT AS PARQUET;""")
|
|
conn.commit()
|
|
print("✅ Copied data to meter_today.")
|
|
|
|
# ==== Merge into meterraw ====
|
|
print("🔄 Merging meter_today into meterraw...")
|
|
try:
|
|
with Session() as session:
|
|
# Delete duplicates
|
|
delete_result = session.execute(text("""
|
|
DELETE FROM lightson.meterraw
|
|
WHERE _id IN (
|
|
SELECT _id FROM lightson.meterraw
|
|
INTERSECT
|
|
SELECT _id FROM lightson.meter_today
|
|
)
|
|
"""))
|
|
session.commit()
|
|
print(f"🗑️ Deleted rows from meterraw: {delete_result.rowcount}")
|
|
|
|
# Insert new records
|
|
insert_result = session.execute(text("""
|
|
INSERT INTO lightson.meterraw (
|
|
_id, m, ds, ms, power_w_, ac_power, angle, current_kwh, fan_speed,
|
|
freq, mode, serialno, temp, updatedat, createdat, current_phase_1,
|
|
current_phase_2, current_phase_3, voltage_phase_1, voltage_phase_2,
|
|
voltage_phase_3, power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,
|
|
room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, "timestamp", oem, algo_flag
|
|
)
|
|
SELECT
|
|
_id, m,
|
|
CAST(ds AS BOOLEAN),
|
|
CAST(ms AS BOOLEAN),
|
|
"power(w)" AS power_w_,
|
|
ac_power, angle, current_kwh, fan_speed, freq, mode, serialno, temp,
|
|
updatedat, createdat, current_phase_1, current_phase_2, current_phase_3,
|
|
voltage_phase_1, voltage_phase_2, voltage_phase_3,
|
|
power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,
|
|
room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, "timestamp", oem, algo_flag
|
|
FROM lightson.meter_today
|
|
"""))
|
|
session.commit()
|
|
print(f"✅ Inserted rows into meterraw: {insert_result.rowcount}")
|
|
|
|
except Exception as e:
|
|
print("❌ Merge failed:", e)
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
print("🎉 MeterRaw pipeline complete.")
|
|
|
|
except Exception as e:
|
|
print("🔥 Fatal error in pipeline:", e)
|
|
traceback.print_exc()
|
|
raise
|
|
finally:
|
|
# Clean up resources
|
|
if 'client' in locals():
|
|
client.close()
|
|
if 'engine' in locals():
|
|
engine.dispose()
|
|
print("Resources cleaned up.")
|
|
|
|
if __name__ == "__main__":
|
|
meter_raw_migration()
|