Files
Data-Migration-Code/data_migration_end_to_end.ipynb

962 lines
37 KiB
Plaintext

import time
import logging
import traceback
from math import ceil
from os import getenv
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
import polars as pl
import pytz
from zoneinfo import ZoneInfo
import boto3
import awswrangler as wr
import redshift_connector
import psycopg2
from pymongo import MongoClient, errors
from pymongo.database import Database
from pymongo.collection import Collection
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
IST = pytz.timezone('Asia/Kolkata')
now = datetime.now(IST)
print(now.strftime('%Y-%m-%d %H:%M:%S'))
UTC = pytz.UTC # or: pytz.timezone('UTC')
now = datetime.now(UTC)
print(now)
#----------------------port forwarding ---------------------
import subprocess
import socket
import time
def is_port_in_use(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
def start_port_forward():
try:
print("Starting port-forward...")
# You can add full path to ./kubectl if needed
subprocess.Popen(
['./kubectl', 'port-forward', '--namespace', 'mongo', 'mongo-db-dr-0', '9001:27017'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
print("Port-forward command issued.")
except Exception as e:
print(f"Error starting port-forward: {e}")
def ensure_port_forward():
if is_port_in_use(9001):
print("Port 9001 is already in use — no need to start port-forward.")
else:
print("Port 9001 is NOT in use — starting port-forward.")
start_port_forward()
time.sleep(5) # Wait a few seconds to let it establish
if is_port_in_use(9001):
print("Port-forward established successfully")
else:
print("Failed to establish port-forward ")
if __name__ == "__main__":
ensure_port_forward()
#-----------------------------data migration start-----------------------
# Configure logging to print instead of writing to a file
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def data_migration(client):
"""Fetch, process, and store data from MongoDB."""
try:
db = client["lightson"]
logging.info("Connected to MongoDB successfully.")
print(" Connection successfully established.")
# New_Energy_Saving Data
collection = db['new_energySaving']
end_date = datetime.today() - timedelta(days=1)
# Ensure all required columns exist
required_columns = ["_id", "serialNo", "devices", "mitigated_co2","et" , "total_kwh", "deviceMapID", "weather_min_temp", "weather_max_temp", "billing_amount",
"updatedAt", "ac_fan_hrs", "ac_run_hrs", "algo_status", "clientID", "cost_reduction", "createdAt", "energy_savings_ref_kwh",
"energy_savings_us_meter", "energy_savings_inv_factor", "energy_savings_us_calc", "energy_savings_savings_percent"]
cursor = collection.find({"createdAt": {"$lt": end_date},},
{"_id": 1, "serialNo": 1, "devices": 1, "mitigated_co2": 1, "total_kwh": 1, "deviceMapID": 1, "weather": 1,
"billing_amount": 1, "updatedAt": 1, "ac_fan_hrs": 1, "ac_run_hrs": 1, "algo_status": 1,
"clientID": 1, "cost_reduction": 1, "createdAt": 1, "energy_savings": 1})
# Convert cursor to a list
data = list(cursor)
if not data:
print("No data found in the specified date range.")
else:
df_nes = pd.json_normalize(data, sep='_')
df_nes = df_nes.explode("devices")
# Convert ObjectId fields to strings
for col in ['_id', 'clientID', 'deviceMapID', 'devices', "serialNo"]:
if col in df_nes.columns:
df_nes[col] = df_nes[col].astype(str)
# Convert numerical fields properly
for col in ["mitigated_co2", "energy_savings_ref_kwh", "energy_savings_savings_percent"]:
if col in df_nes.columns:
df_nes[col] = pd.to_numeric(df_nes[col], errors="coerce")
for col in required_columns:
if col not in df_nes.columns:
df_nes[col] = None
df_nes = df_nes[required_columns]
df_nes.drop_duplicates(subset='_id')
df_nes["et"] = df_nes["et"].fillna("")
df_nes.rename(columns={'weather_min_temp': 'min_temp','weather_max_temp': 'max_temp','energy_savings_ref_kwh': 'ref_kwh',
'energy_savings_us_meter': 'us_meter','energy_savings_inv_factor': 'inv_factor',
'energy_savings_us_calc': 'us_calc','energy_savings_savings_percent': 'savings_percent'}, inplace=True)
# df_nes["et"] = df_nes["et"].fillna("0").astype(str)
df_nes.to_parquet("New_Energy_Saving.parquet", use_deprecated_int96_timestamps=True, index=False, engine='pyarrow')
df_nes = pd.read_parquet("New_Energy_Saving.parquet")
print(df_nes.head(3))
print("New_Energy_Saving.parquet has been successfully saved.")
# Devices Data
df_devices = pd.DataFrame(db.devices.find())
if df_devices.empty:
logging.warning("No data found in 'devices' collection.")
else:
df_devices["_id"] = df_devices["_id"].astype(str)
df_devices["assignedTo"] = df_devices["assignedTo"].astype(str)
if "ports" in df_devices.columns:
df_devices["ports"] = df_devices["ports"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)
df_devices = df_devices.explode("ports")
for column in df_devices.columns:
if column not in ["_id","__v","algorithm_type","assignedTo","AV1_ENABLE","AV1_PENDING_STATUS","AV2_ENABLE","AV2_PENDING_STATUS","configured",
"createdAt","deviceId","dType","IS_MAINTENANCE","IS_PREDICTIVE_MAINTENANCE","last_change","lastConfiguredAt","lastFactoryResetAt","live_algo_status",
"lp","macId","modelnumber","ns","online","OTAUpdatedAt","ports","proto_updatedAt","protocolname","status","temp","updatedAt","version",]:
df_devices.drop(columns=column, inplace=True)
df_devices.to_parquet("devices.parquet", use_deprecated_int96_timestamps = True, index = False)
df_devices = pd.read_parquet("devices.parquet")
df_devices = df_devices.drop_duplicates(subset=["_id"])
print("Devices Data:\n", df_devices.head(1)) # Debugging Output
# Ports Data
df_ports = pd.DataFrame(db.ports.find())
if df_ports.empty:
logging.warning("No data found in 'ports' collection.")
else:
df_ports = df_ports.astype({"_id": "string", "device": "string", "groupId": "string"}, errors="ignore")
df_ports = df_ports.astype("string", errors="ignore")
df_ports.to_parquet("ports.parquet", use_deprecated_int96_timestamps = True, index = False)
df_ports = pd.read_parquet("ports.parquet")
print("Ports Data:\n", df_ports.head(1))
# Clients Data
df_client = pd.DataFrame(db.clients.find())
if df_client.empty:
logging.warning("No data found in 'clients' collection.")
else:
df_client["_id"] = df_client["_id"].astype(str)
for col in ["users", "feature_configuration"]:
if col in df_client.columns:
df_client.drop(col, axis=1, inplace=True)
if "devices" in df_client.columns:
df_client["devices"] = df_client["devices"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)
df_client = df_client.explode("devices")
for column in df_client.columns:
if column not in ["_id","devices","n","e","m","addr","pin","city","state",
"country","createdAt","updatedAt","__v","ct"]:
df_client.drop(columns=column, inplace=True)
df_client.to_parquet("clients.parquet" , use_deprecated_int96_timestamps = True, index = False)
df_client = pd.read_parquet("clients.parquet")
print("Clients Data:\n", df_client.head(1)) # Debugging Output
# Device Map Data
df_deviceMap = pd.DataFrame(client.lightson.deviceMap.find())
if df_deviceMap.empty:
logging.warning("No data found in 'deviceMap' collection.")
else:
df_deviceMap["_id"] = df_deviceMap["_id"].astype(str)
df_deviceMap["ac_age"] = df_deviceMap["ac_age"].astype(str)
if "clientID" in df_deviceMap.columns:
df_deviceMap["clientID"] = df_deviceMap["clientID"].astype(str)
if "devices" in df_deviceMap.columns:
df_deviceMap["devices"] = df_deviceMap["devices"].apply(lambda x: list(map(str, x)) if isinstance(x, list) else x)
df_deviceMap = df_deviceMap.explode("devices")
if "facilityId" in df_deviceMap.columns:
df_deviceMap["facilityId"] = df_deviceMap["facilityId"].astype(str)
df_devicemap2 = df_deviceMap.copy(deep = True)
df_deviceMap = pd.concat([df_devicemap2.drop("toggle_threshold", axis = 1), pd.json_normalize(df_devicemap2["toggle_threshold"], sep = "_").add_prefix("toggle_threshold_")])
for column in df_deviceMap.columns:
if column not in ["_id","__v","ac_age","ac_brand","ac_model_number","ac_ton","ac_type","atm_id","auto_params_calculation","city","client_type","clientID","cool_down",
"createdAt","current_kwh","current_threshold","devices","Efan","fan_consumption","fan_off_equal","fan_threshold","fm_name","fromdate","installation_date",
"latest_ES_Date","Meter","mode_threshold","facilityId","model","NAC","perUnitCost","phase","power_correction","ref_kwh","run_hrs","sched_lock",
"serialNo","SIM_number","site","site_category","site_status","temp_threshold","todate","total_real_energy","updatedAt","sched_lock",
"sched_executed_at","sched_executed_pwr","sched_imminent","sched_last_mismatch_at","sched_corrected_at","star_rating","toggle_threshold_on2off",
"toggle_threshold_off2on","toggle_threshold_notfan2fan","toggle_threshold_fan2notfan"
]:
df_deviceMap.drop(columns=column, inplace=True)
df_deviceMap.to_parquet("deviceMap.parquet", use_deprecated_int96_timestamps = True, index = False)
df_deviceMap = pd.read_parquet("deviceMap.parquet")
df_deviceMap = pd.read_parquet("deviceMap.parquet").drop_duplicates(subset=['_id'], keep='first').reset_index(drop=True)
print("Device Map Data:\n", df_deviceMap.head(1))
# daily_dump
df_dump = pd.read_csv(f"https://docs.google.com/spreadsheets/d/{'16CfL8N28-2nqUxWkbRGQVKlGViW0SoGlbsX4cUIzxKY'}/export?format=csv")
df_dump.columns = df_dump.columns.str.replace(' ', '_')
df_dump = df_dump.astype("string", errors="ignore")
df_dump["Date"] = pd.to_datetime(df_dump["Date"], format = "%d/%m/%Y", errors="coerce")
df_dump.to_parquet("daily_dump.parquet", index = False, use_deprecated_int96_timestamps=True)
df_dump = pd.read_parquet("daily_dump.parquet")
df_deviceMap = pd.read_parquet("deviceMap.parquet")
df_clients = pd.read_parquet("clients.parquet")
df_ports = pd.read_parquet("ports.parquet")
df_nes = pd.read_parquet("New_Energy_Saving.parquet")
df_devices = pd.read_parquet("devices.parquet")
df_dump = pd.read_parquet("daily_dump.parquet")
# Correct dictionary (keys should be strings, not DataFrames)
dfs = {"devicemap": df_deviceMap,"clients": df_clients,"ports": df_ports,"New_Energy_Saving": df_nes,"devices": df_devices ,"daily_dump":df_dump}
session = boto3.Session(
aws_access_key_id = "AKIARXCGNNYDRHFBBB6I",
aws_secret_access_key = "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",
region_name="ap-south-1")
s3_path = "s3://daily-dump-bucket/daily_dump"
conn_redshift = wr.redshift.connect(secret_id="sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb", boto3_session=session)
# Loop through DataFrames and print their sizes
dfs = {"devicemap": df_deviceMap,"clients": df_clients,"ports": df_ports,"new_energy_saving": df_nes,"devices": df_devices ,"daily_dump": df_dump}
for name, df in dfs.items():
wr.redshift.copy(df=df,
path= s3_path,
con=conn_redshift,
schema="lightson",
table=name,
mode="overwrite",
varchar_lengths_default=65535,
boto3_session=session)
time.sleep(5) # Delay to prevent rapid successive writes
logging.info("Data migration completed successfully.")
return df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump
wr.redshift.copy(df=df_dump,
path= s3_path,
con=conn_redshift,
schema="lightson",
table=daily_dump_historical,
mode="append",
varchar_lengths_default=65535,
boto3_session=session)
except errors.ServerSelectionTimeoutError:
logging.error(" MongoDB connection failed! Is the server running?")
return None
except Exception as e:
logging.error(f" Unexpected error: {e}")
return None
# Example usage
try:
client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=5000")
logging.info("MongoDB client created successfully.")
df_devices, df_ports, df_client, df_deviceMap , df_nes , df_dump = data_migration(client)
# Final check if data was retrieved
if df_nes is not None:
print("\n Migration completed successfully.")
else:
print("\n Migration failed. Check logs for details.")
except Exception as e:
logging.error(f" Fatal error in script: {e}")
#---------------------------------------------dls migration ------------------------------
import pandas as pd
import boto3
import awswrangler as wr
from datetime import datetime, timedelta
from pymongo import MongoClient
from pathlib import Path
import os
import polars as pl
from pytz import timezone
# Constants
UTC = timezone('UTC')
IST = timezone('Asia/Kolkata')
def main():
try:
# Initialize connections and settings
client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0")
db = client['lightson']
collection = db['dls']
s3_path = "s3://daily-dump-bucket/ankur-v"
session = boto3.Session(
aws_access_key_id="AKIARXCGNNYDRHFBBB6I",
aws_secret_access_key="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",
region_name="ap-south-1"
)
conn_redshift = wr.redshift.connect(secret_id="sqlworkbench!d5e4ab06-e3af-4c43-9bb9-d48649ab72cb", boto3_session=session)
# Clean up previous file if exists
Path("dls.csv").unlink(missing_ok=True)
# Batch size for processing
batch_size = 200000
output_file = "dls.csv"
# Get yesterday's date in UTC (12:00 AM IST)
start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)
# Define the expected columns (schema) for the CSV file
columns = [
"_id", "__v", "m", "t", "pl_m", "pl_v", "pl_Algo_flag", "pl_pir", "pl_DS", "pl_F",
"pl_https_exp", "pl_Humidity", "pl_LMF", "pl_Mo", "pl_mqtts_exp", "pl_multiple_flag",
"pl_NS", "pl_online", "dId", "cId", "ports", "updatedAt", "createdAt",
"documentUpdatedAt", "pl_tz_set", "pl_timestamp", "pl_oem",
"pl_off_pkt", "pl_OL", "pl_P", "pl_pf", "pl_ph_ang", "pl_proto", "pl_publish",
"pl_room_humid", "pl_room_temp", "pl_sens_pos", "pl_single_flag", "pl_T",
"pl_Tempc", "pl_Time", "pl_tot_run_hrs", "pl_tz", "pl_meter", "pl_voltage",
"pl_current", "pl_power", "pl_freq", "pl_kwh", "pl_kwh_5min",
"pl_run_hrs_5min", "pl_ac_p", "pl_ac_mo", "pl_ac_fs", "pl_ac_temp",
"pl_ac_hs", "pl_ac_vs", "pl_rssi"
]
# Function to process and append chunks to CSV
def process_and_append_to_csv(skip, first_chunk):
cursor = collection.find(
{
"t": "p",
"createdAt": {"$gte": start_date},
},
{"_id": 1, "__v": 1, "m": 1, "t": 1, "pl": 1, "dId": 1, "cId": 1, "pVal": 1,
"ports": 1, "updatedAt": 1, "createdAt": 1, "documentUpdatedAt": 1}
).skip(skip).limit(batch_size)
data = list(cursor)
if not data:
return False
df = pd.json_normalize(data, sep="_")
print(df["updatedAt"].min(), df["updatedAt"].max())
if "ports" in df.columns:
df["ports"] = df["ports"].apply(lambda x: str(x[0]) if isinstance(x, list) and x else None)
df.loc[:, ~df.columns.isin(["updatedAt", "createdAt", "documentUpdatedAt"])] = \
df.loc[:, ~df.columns.isin(["updatedAt", "createdAt", "documentUpdatedAt"])].astype(str)
for col in columns:
if col not in df.columns:
df[col] = None
df["pl_power"] = df['pl_power'].replace('nan', None)
df["pl_Humidity"] = df['pl_Humidity'].replace('nan', None)
df["pl_Tempc"] = df['pl_Tempc'].replace('nan', None)
df["pl_room_temp"] = df['pl_room_temp'].replace('nan', None)
df["pl_kwh_5min"] = df['pl_kwh_5min'].replace('nan', None)
df = df[columns]
df.to_csv(output_file, mode='a', header=first_chunk, index=False)
return True
# Processing the data in chunks
skip = 0
first_chunk = True
chunk_number = 0
while True:
if not process_and_append_to_csv(skip, first_chunk):
break
chunk_number += 1
print(f"Chunk {chunk_number} successfully appended.")
skip += batch_size
first_chunk = False
print(f"Data has been written to {output_file}. Total chunks processed: {chunk_number}")
# Convert CSV to Parquet
df = pl.scan_csv("dls.csv", infer_schema=False)
cols = df.collect_schema().names()
print(f"Columns count: {len(cols)}, Unique columns (case-insensitive): {len(set(x.lower() for x in cols))}")
df_final = df.with_columns(
pl.col("createdAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("createdAt"),
pl.col("updatedAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("updatedAt"),
pl.col("documentUpdatedAt").str.to_datetime("%Y-%m-%d %H:%M:%S%.f", strict=False).alias("documentUpdatedAt"),
).collect().write_parquet(
"dls_updated.parquet",
compression="snappy",
use_pyarrow=True,
pyarrow_options={"use_deprecated_int96_timestamps": True}
)
# Filter out null createdAt records
df2 = pl.scan_parquet("dls_updated.parquet")
df2.filter(pl.col("createdAt").is_not_null()).collect().write_parquet(
"dls_updated_final2.parquet",
compression="snappy",
use_pyarrow=True,
pyarrow_options={"use_deprecated_int96_timestamps": True}
)
# Verify data
dls_df = pl.scan_parquet("dls_updated_final2.parquet")
min_date, max_date = dls_df.select(pl.min("updatedAt")).collect(), dls_df.select(pl.max("updatedAt")).collect()
print(f"Data range - Min: {min_date}, Max: {max_date}")
print("dls is ready")
# Final cleanup and upload preparation
df = pd.read_parquet("dls_updated_final2.parquet")
df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']] = \
df[['pl_power','pl_Humidity','pl_Tempc','pl_room_temp','pl_kwh_5min']].replace('nan', None)
df = df.dropna(axis=1, how='all')
df.to_parquet("dls_updated_final2.parquet", use_deprecated_int96_timestamps=True)
print("Data migration completed successfully!")
except Exception as e:
print(f"Error during data migration: {str(e)}")
raise
finally:
# Clean up resources
if 'client' in locals():
client.close()
if 'conn_redshift' in locals():
conn_redshift.close()
if __name__ == "__main__":
main()
# ==== AWS + S3 Config ====
aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'
aws_secret_access_key = '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'
region_name = 'ap-south-1'
bucket_name = 'ankur-v'
s3_output_key = 'dls.parquet'
local_file_path = 'dls_updated_final2.parquet'
s3_path = f"s3://{bucket_name}/{s3_output_key}"
# ==== Redshift Config ====
redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'
redshift_db = 'dev'
redshift_user = 'awsuser'
redshift_password = 'LtRedshift-2024'
redshift_port = 5439
# ==== SQLAlchemy Config ====
engine = create_engine(
f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'
)
Session = sessionmaker(bind=engine)
# ==== Step 1: Upload to S3 ====
def upload_to_s3():
print("🔼 Uploading file to S3...")
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
s3 = session.resource('s3')
s3.meta.client.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_output_key)
print("✅ File uploaded to S3.")
# ==== Step 2: Copy to Redshift (dls_today) ====
def copy_to_redshift():
print("⬇️ Copying data into Redshift temporary table...")
conn = psycopg2.connect(
host=redshift_host,
port=redshift_port,
database=redshift_db,
user=redshift_user,
password=redshift_password
)
cursor = conn.cursor()
cursor.execute("truncate table lightson.dls_today;")
copy_query = f"""
COPY lightson.dls_today
FROM '{s3_path}'
CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'
FORMAT AS PARQUET;
"""
cursor.execute(copy_query)
cursor.close()
conn.close()
print("✅ Copied data to dls_today.")
# ==== Step 3: Merge data (delete + insert) ====
def merge_into_dls():
print("🔄 Merging dls_today into dls...")
try:
with Session() as session:
# Delete overlapping records
delete_result = session.execute(text("""
DELETE FROM lightson.dls
WHERE _id IN (
SELECT _id FROM lightson.dls
INTERSECT
SELECT _id FROM lightson.dls_today
)
"""))
session.commit()
print(f"🗑️ Rows deleted from dls: {delete_result.rowcount}")
with Session() as session:
# Insert new records
insert_result = session.execute(text("""
INSERT INTO lightson.dls (
_id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds, pl_f, pl_https_exp,
pl_humidity, pl_lmf, pl_mo, pl_mqtts_exp, pl_multiple_flag, pl_ns,
pl_online, did, cid, ports_0, updatedat, createdat, documentupdatedat,
pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt, pl_ol, pl_p, pl_pf,
pl_ph_ang, pl_proto, pl_publish, pl_room_humid, pl_room_temp,
pl_sens_pos, pl_single_flag, pl_t, pl_tempc, pl_time, pl_tot_run_hrs,
pl_tz, pl_meter, pl_voltage, pl_current, pl_power, pl_freq, pl_kwh,
pl_kwh_5min, pl_run_hrs_5min, pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp,
pl_ac_hs, pl_ac_vs, pl_rssi
)
SELECT
_id, __v, m, t, pl_m, pl_v, pl_algo_flag, pl_ds,
NULL AS pl_f, NULL AS pl_https_exp, pl_humidity,
NULL AS pl_lmf, NULL AS pl_mo, NULL AS pl_mqtts_exp,
NULL AS pl_multiple_flag, pl_ns, pl_online, did, cid,
ports AS ports_0, updatedat, createdat, documentupdatedat,
pl_tz_set, pl_timestamp, pl_oem, pl_off_pkt,
NULL AS pl_ol, NULL AS pl_p, pl_pf, pl_ph_ang,
pl_proto, NULL AS pl_publish, pl_room_humid,
pl_room_temp, pl_sens_pos, NULL AS pl_single_flag,
NULL AS pl_t, pl_tempc, pl_time, pl_tot_run_hrs,
NULL AS pl_tz, pl_meter, pl_voltage, pl_current,
pl_power, pl_freq, pl_kwh, pl_kwh_5min, pl_run_hrs_5min,
pl_ac_p, pl_ac_mo, pl_ac_fs, pl_ac_temp, pl_ac_hs,
pl_ac_vs, pl_rssi
FROM lightson.dls_today
"""))
session.commit()
print(f"✅ Rows inserted into dls: {insert_result.rowcount}")
except Exception as e:
print("❌ Merge failed:", e)
traceback.print_exc()
# ==== Main Runner ====
if __name__ == "__main__":
try:
upload_to_s3()
copy_to_redshift()
merge_into_dls()
print("🎉 Data pipeline complete.")
except Exception as e:
print("🔥 Fatal error:", e)
traceback.print_exc()
#---------------------------------------------------------meter raw ------------------------------------------------------------
import pandas as pd
import boto3
import awswrangler as wr
from datetime import datetime, timedelta
from pymongo import MongoClient
from pathlib import Path
import os
import polars as pl
from pytz import timezone
from math import ceil
import time
import psycopg2
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import traceback
# Constants
IST = timezone('Asia/Kolkata')
UTC = timezone('UTC')
CHUNK_SIZE = 100000
def meter_raw_migration():
try:
print("Starting meter_raw_migration process...")
# ==== Configuration ====
# Define start_date (yesterday) and end_date (today) in UTC
start_date = (datetime.today() - timedelta(days=1)).replace(tzinfo=IST).astimezone(UTC).replace(tzinfo=None)
end_date = datetime.utcnow()
# AWS and Redshift configuration
aws_access_key_id = 'AKIARXCGNNYDRHFBBB6I'
aws_secret_access_key = 'LFBwt03wgZK43fJ70B6AsmrOhS5XMtncOyNaI6Zh'
region_name = 'ap-south-1'
bucket_name = 'ankur-v'
s3_output_key = 'meterraw.parquet'
redshift_host = 'ml-test-1.cfppg9yhkagj.ap-south-1.redshift.amazonaws.com'
redshift_db = 'dev'
redshift_user = 'awsuser'
redshift_password = 'LtRedshift-2024'
redshift_port = 5439
# ==== Data Processing ====
# Create output directory
root_path = Path.cwd() / "meterRaw" / end_date.strftime("%Y_%m_%d")
root_path.mkdir(parents=True, exist_ok=True)
# Define schema and columns
cols = [
'_id', 'm', 'DS', 'MS', 'Power(W)', 'ac_power', 'angle', 'current_kwh',
'fan_speed', 'freq', 'mode', 'serialNo', 'temp', 'updatedAt', 'room_temp',
'room_humid', 'ac_hs', 'ac_vs', 'kwh_5min', 'v', 'timestamp', 'oem',
'Algo_flag', 'createdAt', 'current_phase_1', 'current_phase_2',
'current_phase_3', 'voltage_phase_1', 'voltage_phase_2', 'voltage_phase_3',
'power_factor_phase_1', 'power_factor_phase_2', 'power_factor_phase_3'
]
schema = {
"_id": "string",
"updatedAt": "datetime64[ns]",
"createdAt": "datetime64[ns]",
"m": "string",
"current_phase_1": "Float64",
"current_phase_2": "Float64",
"current_phase_3": "Float64",
"DS": "boolean",
"MS": "boolean",
"Power(W)": "Float64",
"voltage_phase_1": "Float64",
"voltage_phase_2": "Float64",
"voltage_phase_3": "Float64",
"ac_power": "Int64",
"angle": "Float64",
"current_kwh": "Float64",
"fan_speed": "Int64",
"freq": "Float64",
"mode": "Int64",
"power_factor_phase_1": "Float64",
"power_factor_phase_2": "Float64",
"power_factor_phase_3": "Float64",
"serialNo": "string",
"temp": "Float64",
"room_temp": "Float64",
"room_humid": "Float64",
"ac_hs": "Int64",
"ac_vs": "Int64",
"kwh_5min": "Float64",
"v": "string",
"timestamp": "string",
"oem": "string",
"Algo_flag": "string"
}
# MongoDB connection
client = MongoClient("mongodb://localhost:9001/lightson?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+1.8.0")
db = client.get_database("lightson")
coll = db.get_collection("meterRaw")
# Query definitions
query = {
"single": {
"Current.0": {"$exists": False}
},
"three": {
"Current.0": {"$exists": True}
}
}
# Helper function for chunk processing
def get_chunk(rows: int, completed_count: int = 0, chunk_size=CHUNK_SIZE):
chunks = ceil((rows - completed_count) / chunk_size)
for i in range(chunks):
skip = completed_count + (i * chunk_size)
limit = min(rows - skip, chunk_size)
yield skip, limit
# ==== Process Single Phase Data ====
print("Processing single phase data...")
query_single_phase = query["single"].copy()
query_single_phase.update({
"updatedAt": {
"$gte": start_date,
"$lte": end_date
}
})
count = coll.count_documents(query_single_phase)
idx = 0
for skip, limit in get_chunk(count):
df = pd.DataFrame(list(coll.find(query_single_phase, skip=skip, limit=limit)))
print(f"Processing single phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}")
df["_id"] = df["_id"].astype(str)
df = df.rename(columns={
"Current": "current_phase_1",
"Voltage": "voltage_phase_1",
"power_factor": "power_factor_phase_1"
}).reindex(columns=cols).astype(schema)
df.to_parquet(root_path / f"single_phase_idx_{idx}.parquet", use_deprecated_int96_timestamps=True)
idx += 1
# ==== Process Three Phase Data ====
print("Processing three phase data...")
query_three_phase = query["three"].copy()
query_three_phase.update({
"updatedAt": {
"$gte": start_date,
"$lte": end_date
}
})
count = coll.count_documents(query_three_phase)
idx = 0
for skip, limit in get_chunk(count):
df = pd.DataFrame(list(coll.find(query_three_phase, skip=skip, limit=limit)))
print(f"Processing three phase chunk {idx}: {skip} to {skip+limit} | Date range: {df['updatedAt'].min()} to {df['updatedAt'].max()}")
# Expand array columns
for key in ("Current", "Voltage", "power_factor"):
array_col = df.pop(key)
df[[f"{key.lower()}_phase_1", f"{key.lower()}_phase_2", f"{key.lower()}_phase_3"]] = array_col.to_list()
df["_id"] = df["_id"].astype(str)
df = df.reindex(columns=cols).astype(schema)
df.to_parquet(root_path / f"three_phase_idx_{idx}.parquet", use_deprecated_int96_timestamps=True)
idx += 1
# ==== Combine and Finalize Data ====
print("Combining all data files...")
df = pl.scan_parquet(root_path / "*.parquet")
final_filename = f"meterRaw_{end_date.strftime('%Y_%m_%d')}.parquet"
df.collect().write_parquet(
Path.cwd() / final_filename,
compression="snappy",
use_pyarrow=True,
pyarrow_options={"use_deprecated_int96_timestamps": True}
)
# Clean up temporary files
for children in root_path.glob("*.parquet"):
children.unlink(missing_ok=True)
root_path.rmdir()
# Verify final file
df2 = pl.scan_parquet(Path.cwd() / final_filename)
date_range = df2.select([
pl.col("createdAt").min().alias("min"),
pl.col("createdAt").max().alias("max")
]).collect()
print(f"Data range in final file: {date_range}")
print("meterraw_is_generated")
# ==== Upload to S3 ====
print("🔼 Uploading MeterRaw to S3...")
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
s3 = session.resource('s3')
s3.meta.client.upload_file(
Filename=final_filename,
Bucket=bucket_name,
Key=s3_output_key
)
print("✅ File uploaded to S3.")
# ==== Redshift Operations ====
# Initialize SQLAlchemy engine
engine = create_engine(
f'postgresql+psycopg2://{redshift_user}:{redshift_password}@{redshift_host}:{redshift_port}/{redshift_db}'
)
Session = sessionmaker(bind=engine)
# ==== Copy to Redshift (meter_today) ====
print("⬇️ Copying data into Redshift meter_today...")
s3_path = f"s3://{bucket_name}/{s3_output_key}"
conn = psycopg2.connect(
host=redshift_host,
port=redshift_port,
database=redshift_db,
user=redshift_user,
password=redshift_password)
cursor = conn.cursor()
cursor.execute("TRUNCATE TABLE lightson.meter_today;")
cursor.execute(f"""COPY lightson.meter_today FROM '{s3_path}' CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' FORMAT AS PARQUET;""")
conn.commit()
print("✅ Copied data to meter_today.")
# ==== Merge into meterraw ====
print("🔄 Merging meter_today into meterraw...")
try:
with Session() as session:
# Delete duplicates
delete_result = session.execute(text("""
DELETE FROM lightson.meterraw
WHERE _id IN (
SELECT _id FROM lightson.meterraw
INTERSECT
SELECT _id FROM lightson.meter_today
)
"""))
session.commit()
print(f"🗑️ Deleted rows from meterraw: {delete_result.rowcount}")
# Insert new records
insert_result = session.execute(text("""
INSERT INTO lightson.meterraw (
_id, m, ds, ms, power_w_, ac_power, angle, current_kwh, fan_speed,
freq, mode, serialno, temp, updatedat, createdat, current_phase_1,
current_phase_2, current_phase_3, voltage_phase_1, voltage_phase_2,
voltage_phase_3, power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,
room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, "timestamp", oem, algo_flag
)
SELECT
_id, m,
CAST(ds AS BOOLEAN),
CAST(ms AS BOOLEAN),
"power(w)" AS power_w_,
ac_power, angle, current_kwh, fan_speed, freq, mode, serialno, temp,
updatedat, createdat, current_phase_1, current_phase_2, current_phase_3,
voltage_phase_1, voltage_phase_2, voltage_phase_3,
power_factor_phase_1, power_factor_phase_2, power_factor_phase_3,
room_temp, room_humid, ac_hs, ac_vs, kwh_5min, v, "timestamp", oem, algo_flag
FROM lightson.meter_today
"""))
session.commit()
print(f"✅ Inserted rows into meterraw: {insert_result.rowcount}")
except Exception as e:
print("❌ Merge failed:", e)
traceback.print_exc()
raise
print("🎉 MeterRaw pipeline complete.")
except Exception as e:
print("🔥 Fatal error in pipeline:", e)
traceback.print_exc()
raise
finally:
# Clean up resources
if 'client' in locals():
client.close()
if 'engine' in locals():
engine.dispose()
print("Resources cleaned up.")
if __name__ == "__main__":
meter_raw_migration()