!pip install sqlalchemy !pip install redshift-connector !pip install --upgrade sqlalchemy-redshift !pip install polars !pip install timedelta import time import logging import traceback from math import ceil from os import getenv from datetime import datetime, timedelta from pathlib import Path import pandas as pd import polars as pl import pytz from zoneinfo import ZoneInfo import boto3 import awswrangler as wr import redshift_connector import psycopg2 from pymongo import MongoClient, errors from pymongo.database import Database from pymongo.collection import Collection from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker IST = pytz.timezone('Asia/Kolkata') now = datetime.now(IST) print(now.strftime('%Y-%m-%d %H:%M:%S')) UTC = pytz.UTC # or: pytz.timezone('UTC') now = datetime.now(UTC) print(now) #----------------------port forwarding --------------------- import subprocess import socket import time def is_port_in_use(port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(('localhost', port)) == 0 def start_port_forward(): try: print("Starting port-forward...") # You can add full path to ./kubectl if needed subprocess.Popen( ['./kubectl', 'port-forward', '--namespace', 'mongo', 'mongo-db-dr-0', '9001:27017'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) print("Port-forward command issued.") except Exception as e: print(f"Error starting port-forward: {e}") def ensure_port_forward(): if is_port_in_use(9001): print("Port 9001 is already in use — no need to start port-forward.") else: print("Port 9001 is NOT in use — starting port-forward.") start_port_forward() time.sleep(5) # Wait a few seconds to let it establish if is_port_in_use(9001): print("Port-forward established successfully") else: print("Failed to establish port-forward ") if __name__ == "__main__": ensure_port_forward() #-----------------------------data migration start----------------------- # Configure logging to print instead of writing to a file logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") def data_migration(client): """Fetch, process, and store data from MongoDB.""" try: db = client["lightson"] logging.info("Connected to MongoDB successfully.") print(" Connection successfully established.") # New_Energy_Saving Data collection = db['new_energySaving'] end_date = datetime.today() - timedelta(days=1) # Ensure all required columns exist required_columns = ["_id", "serialNo", "devices", "mitigated_co2","et" , "total_kwh", "deviceMapID", "weather_min_temp", "weather_max_temp", "billing_amount", "updatedAt", "ac_fan_hrs", "ac_run_hrs", "algo_status", "clientID", "cost_reduction", "createdAt", "energy_savings_ref_kwh", "energy_savings_us_meter", "energy_savings_inv_factor", "energy_savings_us_calc", "energy_savings_savings_percent"] cursor = collection.find({"createdAt": {"$lt": end_date},}, {"_id": 1, "serialNo": 1, "devices": 1, "mitigated_co2": 1, "total_kwh": 1, "deviceMapID": 1, "weather": 1, "billing_amount": 1, "updatedAt": 1, "ac_fan_hrs": 1, "ac_run_hrs": 1, "algo_status": 1, "clientID": 1, "cost_reduction": 1, "createdAt": 1, "energy_savings": 1}) # Convert cursor to a list data = list(cursor) if not data: print("No data found in the specified date range.") else: df_nes = pd.json_normalize(data, sep='_') df_nes = df_nes.explode("devices") # Convert ObjectId fields to strings for col in ['_id', 'clientID', 'deviceMapID', 'devices', "serialNo"]: if col in df_nes.columns: df_nes[col] = df_nes[col].astype(str) # Convert numerical fields properly for col in ["mitigated_co2", "energy_savings_ref_kwh", "energy_savings_savings_percent"]: if col in df_nes.columns: df_nes[col] = pd.to_numeric(df_nes[col], errors="coerce") for col in required_columns: if col not in df_nes.columns: df_nes[col] = None df_nes = df_nes[required_columns] df_nes.drop_duplicates(subset='_id') df_nes["et"] = df_nes["et"].fillna("") df_nes.rename(columns={'weather_min_temp': 'min_temp','weather_max_temp': 'max_temp','energy_savings_ref_kwh': 'ref_kwh', 'energy_savings_us_meter': 'us_meter','energy_savings_inv_factor': 'inv_factor', 'energy_savings_us_calc': 'us_calc','energy_savings_savings_percent': 'savings_percent'}, inplace=True) # df_nes["et"] = df_nes["et"].fillna("0").astype(str) df_nes.to_parquet("New_Energy_Saving.parquet", use_deprecated_int96_timestamps=True, index=False, engine='pyarrow') df_nes = pd.read_parquet("New_Energy_Saving.parquet") print(df_nes.head(3)) print("New_Energy_Saving.parquet has been successfully saved.") # Devices Data df_devices = pd.DataFrame(db.devices.find()) if df_devices.empty: logging.warning("No data found in 'devices' collection.") else: df_devices["_id"] = df_devices["_id"].astype(str) df_devices["assignedTo"] = df_devices["assignedTo"].astype(str) if "ports" in df_devices.columns: df_devices["ports"] = df_devices["ports"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x) df_devices = df_devices.explode("ports") for column in df_devices.columns: if column not in ["_id","__v","algorithm_type","assignedTo","AV1_ENABLE","AV1_PENDING_STATUS","AV2_ENABLE","AV2_PENDING_STATUS","configured", "createdAt","deviceId","dType","IS_MAINTENANCE","IS_PREDICTIVE_MAINTENANCE","last_change","lastConfiguredAt","lastFactoryResetAt","live_algo_status", "lp","macId","modelnumber","ns","online","OTAUpdatedAt","ports","proto_updatedAt","protocolname","status","temp","updatedAt","version",]: df_devices.drop(columns=column, inplace=True) df_devices.to_parquet("devices.parquet", use_deprecated_int96_timestamps = True, index = False) df_devices = pd.read_parquet("devices.parquet") df_devices = df_devices.drop_duplicates(subset=["_id"]) print("Devices Data:\n", df_devices.head(1)) # Debugging Output # Ports Data df_ports = pd.DataFrame(db.ports.find()) if df_ports.empty: logging.warning("No data found in 'ports' collection.") else: df_ports = df_ports.astype({"_id": "string", "device": "string", "groupId": "string"}, errors="ignore") df_ports = df_ports.astype("string", errors="ignore") df_ports.to_parquet("ports.parquet", use_deprecated_int96_timestamps = True, index = False) df_ports = pd.read_parquet("ports.parquet") print("Ports Data:\n", df_ports.head(1)) # Clients Data df_client = pd.DataFrame(db.clients.find()) if df_client.empty: logging.warning("No data found in 'clients' collection.") else: df_client["_id"] = df_client["_id"].astype(str) for col in ["users", "feature_configuration"]: if col in df_client.columns: df_client.drop(col, axis=1, inplace=True) if "devices" in df_client.columns: df_client["devices"] = df_client["devices"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x) df_client = df_client.explode("devices") for column in df_client.columns: if column not in ["_id","devices","n","e","m","addr","pin","city","state", "country","createdAt","updatedAt","__v","ct"]: df_client.drop(columns=column, inplace=True) df_client.to_parquet("clients.parquet" , use_deprecated_int96_timestamps = True, index = False) df_client = pd.read_parquet("clients.parquet") print("Clients Data:\n", df_client.head(1)) # Debugging Output # Device Map Data df_deviceMap = pd.DataFrame(client.lightson.deviceMap.find()) if df_deviceMap.empty: logging.warning("No data found in 'deviceMap' collection.") else: df_deviceMap["_id"] = df_deviceMap["_id"].astype(str) df_deviceMap["ac_age"] = df_deviceMap["ac_age"].astype(str) if "clientID" in df_deviceMap.columns: df_deviceMap["clientID"] = df_deviceMap["clientID"].astype(str) if "devices" in df_deviceMap.columns: df_deviceMap["devices"] = df_deviceMap["devices"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x) df_deviceMap = df_deviceMap.explode("devices") if "facilityId" in df_deviceMap.columns: df_deviceMap["facilityId"] = df_deviceMap["facilityId"].astype(str) df_devicemap2 = df_deviceMap.copy(deep = True) df_deviceMap = pd.concat([df_devicemap2.drop("toggle_threshold", axis = 1), pd.json_normalize(df_devicemap2["toggle_threshold"], sep = "_").add_prefix("toggle_threshold_")]) for column in df_deviceMap.columns: if column not in ["_id","__v","ac_age","ac_brand","ac_model_number","ac_ton","ac_type","atm_id","auto_params_calculation","city","client_type","clientID","cool_down", "createdAt","current_kwh","current_threshold","devices","Efan","fan_consumption","fan_off_equal","fan_threshold","fm_name","fromdate","installation_date", "latest_ES_Date","Meter","mode_threshold","facilityId","model","NAC","perUnitCost","phase","power_correction","ref_kwh","run_hrs","sched_lock", "serialNo","SIM_number","site","site_category","site_status","temp_threshold","todate","total_real_energy","updatedAt","sched_lock", "sched_executed_at","sched_executed_pwr","sched_imminent","sched_last_mismatch_at","sched_corrected_at","star_rating","toggle_threshold_on2off", "toggle_threshold_off2on","toggle_threshold_notfan2fan","toggle_threshold_fan2notfan" ]: df_deviceMap.drop(columns=column, inplace=True) df_deviceMap.to_parquet("deviceMap.parquet", use_deprecated_int96_timestamps = True, index = False) df_deviceMap = pd.read_parquet("deviceMap.parquet") df_deviceMap = pd.read_parquet("deviceMap.parquet").drop_duplicates(subset=['_id'], keep='first').reset_index(drop=True) print("Device Map Data:\n", df_deviceMap.head(1)) # daily_dump df_dump = pd.read_csv(f"https://docs.google.com/spreadsheets/d/{'16CfL8N28-2nqUxWkbRGQVKlGViW0SoGlbsX4cUIzxKY'}/export?format=csv") df_dump.columns = df_dump.columns.str.replace(' ', '_') df_dump = df_dump.astype("string", errors="ignore") df_dump["Date"] = pd.to_datetime(df_dump["Date"], format = "%d/%m/%Y", errors="coerce") df_dump.to_parquet("daily_dump.parquet", index = False, use_deprecated_int96_timestamps=True) df_dump = pd.read_parquet("daily_dump.parquet") df_deviceMap = pd.read_parquet("deviceMap.parquet") df_clients = pd.read_parquet("clients.parquet") df_ports = pd.read_parquet("ports.parquet") df_nes = pd.read_parquet("New_Energy_Saving.parquet") df_devices = pd.read_parquet("devices.parquet") df_dump = pd.read_parquet("daily_dump.parquet") # Correct dictionary (keys should be strings, not DataFrames) dfs = {"devicemap": df_deviceMap,"clients": df_clients,"ports": df_ports,"New_Energy_Saving": df_nes,"devices": df_devices ,"daily_dump":df_dump} session = boto3.Session( aws_access_key_id = "AKIARXCGNNYDRHFBBB6I", aws_secret_access_key = "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", region_name="ap-south-1") s3_path = "s3://daily-dump-bucket/daily_dump" conn_redshift = wr.redshift.connect(secret_id="sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb", boto3_session=session) # Loop through DataFrames and print their sizes dfs = {"devicemap": df_deviceMap,"clients": df_clients,"ports": df_ports,"new_energy_saving": df_nes,"devices": df_devices ,"daily_dump": df_dump} for name, df in dfs.items(): wr.redshift.copy(df=df, path= s3_path, con=conn_redshift, schema="lightson", table=name, mode="overwrite", varchar_lengths_default=65535, boto3_session=session) time.sleep(5) # Delay to prevent rapid successive writes logging.info("Data migration completed successfully.") return df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump wr.redshift.copy(df=df_dump, path= s3_path, con=conn_redshift, schema="lightson", table=daily_dump_historical, mode="append", varchar_lengths_default=65535, boto3_session=session) except errors.ServerSelectionTimeoutError: logging.error(" MongoDB connection failed! Is the server running?") return None except Exception as e: logging.error(f" Unexpected error: {e}") return None # Example usage try: client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=5000") logging.info("MongoDB client created successfully.") df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump = data_migration(client) # Final check if data was retrieved if df_nes is not None: print("\n Migration completed successfully.") else: print("\n Migration failed. Check logs for details.") except Exception as e: logging.error(f" Fatal error in script: {e}") #---------------------------------------------dls migration ------------------------------ import pandas as pd import boto3 import awswrangler as wr from datetime import datetime, timedelta from pymongo import MongoClient from pathlib import Path import os import polars as pl from pytz import timezone # Constants UTC = timezone('UTC') IST = timezone('Asia/Kolkata') def main(): try: # Initialize connections and settings client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0") db = client['lightson'] collection = db['dls'] s3_path = "s3://daily-dump-bucket/ankur-v" session = boto3.Session( aws_access_key_id="AKIARXCGNNYDRHFBBB6I", aws_secret_access_key="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", region_name="ap-south-1" ) conn_redshift = wr.redshift.connect(secret_id="sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb", boto3_session=session) # Clean up previous file if exists Path("dls.csv").unlink(missing_ok=True) # Batch size for processing batch_size = 200000 output_file = "dls.csv" # Get yesterday's date in UTC (12:00 AM IST) start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None) # Define the expected columns (schema) for the CSV file columns = [ "_id", "__v", "m", "t", "pl_m", "pl_v", "pl_Algo_flag", "pl_pir", "pl_DS", "pl_F", "pl_https_exp", "pl_Humidity", "pl_LMF", "pl_Mo", "pl_mqtts_exp", "pl_multiple_flag", "pl_NS", "pl_online", "dId", "cId", "ports", "updatedAt", "createdAt", "documentUpdatedAt", "pl_tz_set", "pl_timestamp", "pl_oem", "pl_off_pkt", "pl_OL", "pl_P", "pl_pf", "pl_ph_ang", "pl_proto", "pl_publish", "pl_room_humid", "pl_room_temp", "pl_sens_pos", "pl_single_flag", "pl_T", "pl_Tempc", "pl_Time", "pl_tot_run_hrs", "pl_tz", "pl_meter", "pl_voltage", "pl_current", "pl_power", "pl_freq", "pl_kwh", "pl_kwh_5min", "pl_run_hrs_5min", "pl_ac_p", "pl_ac_mo", "pl_ac_fs", "pl_ac_temp", "pl_ac_hs", "pl_ac_vs", "pl_rssi" ] # Function to process and append chunks to CSV def process_and_append_to_csv(skip, first_chunk): cursor = collection.find( { "t": "p", "createdAt": {"$gte": start_date}, }, {"_id": 1, "__v": 1, "m": 1, "t": 1, "pl": 1, "dId": 1, "cId": 1, "pVal": 1, "ports": 1, "updatedAt": 1, "createdAt": 1, "documentUpdatedAt": 1} ).skip(skip).limit(batch_size) data = list(cursor) if not data: return False df = pd.json_normalize(data, sep="_") print(df["updatedAt"].min(), df["updatedAt"].max()) if "ports" in df.columns: df["ports"] = df["ports"].apply(lambda x: str(x[0]) if isinstance(x, list) and x else None) df.loc[:, ~df.columns.isin(["updatedAt", "createdAt", "documentUpdatedAt"])] = \ df.loc[:, ~df.columns.isin(["updatedAt", "createdAt", "documentUpdatedAt"])].astype(str) for col in columns: if col not in df.columns: df[col] = None df["pl_power"] = df['pl_power'].replace('nan', None) df["pl_Humidity"] = df['pl_Humidity'].replace('nan', None) df["pl_Tempc"] = df['pl_Tempc'].replace('nan', None) df["pl_room_temp"] = df['pl_room_temp'].replace('nan', None) df["pl_kwh_5min"] = df['pl_kwh_5min'].replace('nan', None) df = df[columns] df.to_csv(output_file, mode='a', header=first_chunk, index=False) return True # Processing the data in chunks skip = 0 first_chunk = True chunk_number = 0 while True: if not process_and_append_to_csv(skip, first_chunk): break chunk_number += 1 print(f"Chunk {chunk_number} successfully appended.") skip += batch_size first_chunk = False print(f"Data has been written to {output_file}. Total chunks processed: {chunk_number}") # Convert CSV to Parquet df = pl.scan_csv("dls.csv", infer_schema=False) cols = df.collect_schema().names() print(f"Columns count: {len(cols)}, Unique columns (case-insensitive): {len(set(x.lower() for x in cols))}") df_final = df.with_columns( pl.col("createdAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("createdAt"), pl.col("updatedAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("updatedAt"), pl.col("documentUpdatedAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("documentUpdatedAt"), ).collect().write_parquet( "dls_updated.parquet", compression="snappy", use_pyarrow=True, pyarrow_options={"use_deprecated_int96_timestamps": True} ) # Filter out null createdAt records df2 = pl.scan_parquet("dls_updated.parquet") df2.filter(pl.col("createdAt").is_not_null()).collect().write_parquet( "dls_updated_final2.parquet", compression="snappy", use_pyarrow=True, pyarrow_options={"use_deprecated_int96_timestamps": True} ) # Verify data dls_df = pl.scan_parquet("dls_updated_final2.parquet") min_date, max_date = dls_df.select(pl.min("updatedAt")).collect(), dls_df.select(pl.max("updatedAt")).collect() print(f"Data range - Min: {min_date}, Max: {max_date}") print("dls is ready") # Final cleanup and upload preparation df = pd.read_parquet("dls_updated_final2.parquet") df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']] = \ df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']].replace('nan', None) df = df.dropna(axis=1, how='all') df.to_parquet("dls_updated_final2.parquet", use_deprecated_int96_timestamps=True) print("Data migration completed successfully!") except Exception as e: print(f"Error during data migration: {str(e)}") raise finally: # Clean up resources if 'client' in locals(): client.close() if 'conn_redshift' in locals(): conn_redshift.close() if __name__ == "__main__": main() # ==== AWS + S3 Config ==== aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I' aws_secret_access_key = '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' region_name = 'ap-south-1' bucket_name = 'ankur-v' s3_output_key = 'dls.parquet' local_file_path = 'dls_updated_final2.parquet' s3_path = f"s3://{bucket_name}/{s3_output_key}" # ==== Redshift Config ==== redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com' redshift_db = 'dev' redshift_user = 'awsuser' redshift_password = 'LtRedshift-2024' redshift_port = 5439 # ==== SQLAlchemy Config ==== engine = create_engine( f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}' ) Session = sessionmaker(bind=engine) # ==== Step 1: Upload to S3 ==== def upload_to_s3(): print("🔼 Uploading file to S3...") session = boto3.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name ) s3 = session.resource('s3') s3.meta.client.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_output_key) print("✅ File uploaded to S3.") # ==== Step 2: Copy to Redshift (dls_today) ==== def copy_to_redshift(): print("⬇️ Copying data into Redshift temporary table...") conn = psycopg2.connect( host=redshift_host, port=redshift_port, database=redshift_db, user=redshift_user, password=redshift_password ) cursor = conn.cursor() cursor.execute("truncate table lightson.dls_today;") copy_query = f""" COPY lightson.dls_today FROM '{s3_path}' CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' FORMAT AS PARQUET; """ cursor.execute(copy_query) cursor.close() conn.close() print("✅ Copied data to dls_today.") # ==== Step 3: Merge data (delete + insert) ==== def merge_into_dls(): print("🔄 Merging dls_today into dls...") try: with Session() as session: # Delete overlapping records delete_result = session.execute(text(""" DELETE FROM lightson.dls WHERE _id IN ( SELECT _id FROM lightson.dls INTERSECT SELECT _id FROM lightson.dls_today ) """)) session.commit() print(f"🗑️ Rows deleted from dls: {delete_result.rowcount}") with Session() as session: # Insert new records insert_result = session.execute(text(""" INSERT INTO lightson.dls ( _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, pl_f, pl_https_exp, pl_humidity, pl_lmf, pl_mo, pl_mqtts_exp, pl_multiple_flag, pl_ns, pl_online, did, cid, ports_0, updatedat, createdat, documentupdatedat, pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, pl_ol, pl_p, pl_pf, pl_ph_ang, pl_proto, pl_publish, pl_room_humid, pl_room_temp, pl_sens_pos, pl_single_flag, pl_t, pl_tempc, pl_time, pl_tot_run_hrs, pl_tz, pl_meter, pl_voltage, pl_current, pl_power, pl_freq, pl_kwh, pl_kwh_5min, pl_run_hrs_5min, pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, pl_ac_hs, pl_ac_vs, pl_rssi ) SELECT _id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, NULL AS pl_f, NULL AS pl_https_exp, pl_humidity, NULL AS pl_lmf, NULL AS pl_mo, NULL AS pl_mqtts_exp, NULL AS pl_multiple_flag, pl_ns, pl_online, did, cid, ports AS ports_0, updatedat, createdat, documentupdatedat, pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, NULL AS pl_ol, NULL AS pl_p, pl_pf, pl_ph_ang, pl_proto, NULL AS pl_publish, pl_room_humid, pl_room_temp, pl_sens_pos, NULL AS pl_single_flag, NULL AS pl_t, pl_tempc, pl_time, pl_tot_run_hrs, NULL AS pl_tz, pl_meter, pl_voltage, pl_current, pl_power, pl_freq, pl_kwh, pl_kwh_5min, pl_run_hrs_5min, pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, pl_ac_hs, pl_ac_vs, pl_rssi FROM lightson.dls_today """)) session.commit() print(f"✅ Rows inserted into dls: {insert_result.rowcount}") except Exception as e: print("❌ Merge failed:", e) traceback.print_exc() # ==== Main Runner ==== if __name__ == "__main__": try: upload_to_s3() copy_to_redshift() merge_into_dls() print("🎉 Data pipeline complete.") except Exception as e: print("🔥 Fatal error:", e) traceback.print_exc() #---------------------------------------------------------meter raw ------------------------------------------------------------ import pandas as pd import boto3 import awswrangler as wr from datetime import datetime, timedelta from pymongo import MongoClient from pathlib import Path import os import polars as pl from pytz import timezone from math import ceil import time import psycopg2 from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker import traceback # Constants IST = timezone('Asia/Kolkata') UTC = timezone('UTC') CHUNK_SIZE = 100000 def meter_raw_migration(): try: print("Starting meter_raw_migration process...") # ==== Configuration ==== # Define start_date (yesterday) and end_date (today) in UTC start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None) end_date = datetime.utcnow() # AWS and Redshift configuration aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I' aws_secret_access_key = 'LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh' region_name = 'ap-south-1' bucket_name = 'ankur-v' s3_output_key = 'meterraw.parquet' redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com' redshift_db = 'dev' redshift_user = 'awsuser' redshift_password = 'LtRedshift-2024' redshift_port = 5439 # ==== Data Processing ==== # Create output directory root_path = Path.cwd() / "meterRaw" / end_date.strftime("%Y_%m_%d") root_path.mkdir(parents=True, exist_ok=True) # Define schema and columns cols = [ '_id', 'm', 'DS', 'MS', 'Power(W)', 'ac_power', 'angle', 'current_kwh', 'fan_speed', 'freq', 'mode', 'serialNo', 'temp', 'updatedAt', 'room_temp', 'room_humid', 'ac_hs', 'ac_vs', 'kwh_5min', 'v', 'timestamp', 'oem', 'Algo_flag', 'createdAt', 'current_phase_1', 'current_phase_2', 'current_phase_3', 'voltage_phase_1', 'voltage_phase_2', 'voltage_phase_3', 'power_factor_phase_1', 'power_factor_phase_2', 'power_factor_phase_3' ] schema = { "_id": "string", "updatedAt": "datetime64[ns]", "createdAt": "datetime64[ns]", "m": "string", "current_phase_1": "Float64", "current_phase_2": "Float64", "current_phase_3": "Float64", "DS": "boolean", "MS": "boolean", "Power(W)": "Float64", "voltage_phase_1": "Float64", "voltage_phase_2": "Float64", "voltage_phase_3": "Float64", "ac_power": "Int64", "angle": "Float64", "current_kwh": "Float64", "fan_speed": "Int64", "freq": "Float64", "mode": "Int64", "power_factor_phase_1": "Float64", "power_factor_phase_2": "Float64", "power_factor_phase_3": "Float64", "serialNo": "string", "temp": "Float64", "room_temp": "Float64", "room_humid": "Float64", "ac_hs": "Int64", "ac_vs": "Int64", "kwh_5min": "Float64", "v": "string", "timestamp": "string", "oem": "string", "Algo_flag": "string" } # MongoDB connection client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0") db = client.get_database("lightson") coll = db.get_collection("meterRaw") # Query definitions query = { "single": { "Current.0": {"$exists": False} }, "three": { "Current.0": {"$exists": True} } } # Helper function for chunk processing def get_chunk(rows: int, completed_count: int = 0, chunk_size=CHUNK_SIZE): chunks = ceil((rows - completed_count) / chunk_size) for i in range(chunks): skip = completed_count + (i * chunk_size) limit = min(rows - skip, chunk_size) yield skip, limit # ==== Process Single Phase Data ==== print("Processing single phase data...") query_single_phase = query["single"].copy() query_single_phase.update({ "updatedAt": { "$gte": start_date, "$lte": end_date } }) count = coll.count_documents(query_single_phase) idx = 0 for skip, limit in get_chunk(count): df = pd.DataFrame(list(coll.find(query_single_phase, skip=skip, limit=limit))) print(f"Processing single phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}") df["_id"] = df["_id"].astype(str) df = df.rename(columns={ "Current": "current_phase_1", "Voltage": "voltage_phase_1", "power_factor": "power_factor_phase_1" }).reindex(columns=cols).astype(schema) df.to_parquet(root_path / f"single_phase_idx_{idx}.parquet", use_deprecated_int96_timestamps=True) idx += 1 # ==== Process Three Phase Data ==== print("Processing three phase data...") query_three_phase = query["three"].copy() query_three_phase.update({ "updatedAt": { "$gte": start_date, "$lte": end_date } }) count = coll.count_documents(query_three_phase) idx = 0 for skip, limit in get_chunk(count): df = pd.DataFrame(list(coll.find(query_three_phase, skip=skip, limit=limit))) print(f"Processing three phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}") # Expand array columns for key in ("Current", "Voltage", "power_factor"): array_col = df.pop(key) df[[f"{key.lower()}_phase_1", f"{key.lower()}_phase_2", f"{key.lower()}_phase_3"]] = array_col.to_list() df["_id"] = df["_id"].astype(str) df = df.reindex(columns=cols).astype(schema) df.to_parquet(root_path / f"three_phase_idx_{idx}.parquet", use_deprecated_int96_timestamps=True) idx += 1 # ==== Combine and Finalize Data ==== print("Combining all data files...") df = pl.scan_parquet(root_path / "*.parquet") final_filename = f"meterRaw_{end_date.strftime('%Y_%m_%d')}.parquet" df.collect().write_parquet( Path.cwd() / final_filename, compression="snappy", use_pyarrow=True, pyarrow_options={"use_deprecated_int96_timestamps": True} ) # Clean up temporary files for children in root_path.glob("*.parquet"): children.unlink(missing_ok=True) root_path.rmdir() # Verify final file df2 = pl.scan_parquet(Path.cwd() / final_filename) date_range = df2.select([ pl.col("createdAt").min().alias("min"), pl.col("createdAt").max().alias("max") ]).collect() print(f"Data range in final file: {date_range}") print("meterraw_is_generated") # ==== Upload to S3 ==== print("🔼 Uploading MeterRaw to S3...") session = boto3.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name ) s3 = session.resource('s3') s3.meta.client.upload_file( Filename=final_filename, Bucket=bucket_name, Key=s3_output_key ) print("✅ File uploaded to S3.") # ==== Redshift Operations ==== # Initialize SQLAlchemy engine engine = create_engine( f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}' ) Session = sessionmaker(bind=engine) # ==== Copy to Redshift (meter_today) ==== print("⬇️ Copying data into Redshift meter_today...") s3_path = f"s3://{bucket_name}/{s3_output_key}" conn = psycopg2.connect( host=redshift_host, port=redshift_port, database=redshift_db, user=redshift_user, password=redshift_password) cursor = conn.cursor() cursor.execute("TRUNCATE TABLE lightson.meter_today;") cursor.execute(f"""COPY lightson.meter_today FROM '{s3_path}' CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' FORMAT AS PARQUET;""") conn.commit() print("✅ Copied data to meter_today.") # ==== Merge into meterraw ==== print("🔄 Merging meter_today into meterraw...") try: with Session() as session: # Delete duplicates delete_result = session.execute(text(""" DELETE FROM lightson.meterraw WHERE _id IN ( SELECT _id FROM lightson.meterraw INTERSECT SELECT _id FROM lightson.meter_today ) """)) session.commit() print(f"🗑️ Deleted rows from meterraw: {delete_result.rowcount}") # Insert new records insert_result = session.execute(text(""" INSERT INTO lightson.meterraw ( _id, m, ds, ms, power_w_, ac_power, angle, current_kwh, fan_speed, freq, mode, serialno, temp, updatedat, createdat, current_phase_1, current_phase_2, current_phase_3, voltage_phase_1, voltage_phase_2, voltage_phase_3, power_factor_phase_1, power_factor_phase_2, power_factor_phase_3, room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, "timestamp", oem, algo_flag ) SELECT _id, m, CAST(ds AS BOOLEAN), CAST(ms AS BOOLEAN), "power(w)" AS power_w_, ac_power, angle, current_kwh, fan_speed, freq, mode, serialno, temp, updatedat, createdat, current_phase_1, current_phase_2, current_phase_3, voltage_phase_1, voltage_phase_2, voltage_phase_3, power_factor_phase_1, power_factor_phase_2, power_factor_phase_3, room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, "timestamp", oem, algo_flag FROM lightson.meter_today """)) session.commit() print(f"✅ Inserted rows into meterraw: {insert_result.rowcount}") except Exception as e: print("❌ Merge failed:", e) traceback.print_exc() raise print("🎉 MeterRaw pipeline complete.") except Exception as e: print("🔥 Fatal error in pipeline:", e) traceback.print_exc() raise finally: # Clean up resources if 'client' in locals(): client.close() if 'engine' in locals(): engine.dispose() print("Resources cleaned up.") if __name__ == "__main__": meter_raw_migration()