Data Validation with Pyspark || Schema Comparison || Dynamically || Real Time Scenario
HTML-код
- Опубликовано: 17 окт 2024
- In this Video we covered how we can perform quick data validation like Schema comparison between source and Target: In the next video we will look into Date/TimeStamp format check and duplicate count check .
Column Comparison link :
• Data Validation with P...
#dataanalytics #dataengineeringessentials #azuredatabricks
#dataanalysis
#pyspark
#pythonprogramming
#sql
#databricks #PySpark #Spark #DatabricksNotebook #PySparkLogic
Thank you for your work !!! It would be amazing if you could enhance the video with "chapters" to put more context in what you explain in the differents sections of the video :) !
Great suggestion!
I found this very useful as I had a similar issue with data validations. It helped a lot while completing my project.
Glad it helped!
I think schema comparison is the important topic in pyspark . Great explanation sir ❤
thank you bro
code in github?
Here is the link bro : drive.google.com/drive/folders/1I6rqtiKh1ChM_dkLJwxfxyxcHWPgyiKZ?usp=sharing
source code ?
def SchemaComparision(controldf, spsession, refdf):
try:
#iterate controldf and get the filename and filepath
for x in controldf.collect():
filename = x['filename']
#print(filename)
filepath = x['filepath']
#print(filepath)
#define the dataframes from the filepaths
print("Data frame is creating for {} or {}".format(filepath, filename))
dfs = spsession.read.format('csv').option('header', True).option('inferSchema', True).load(filepath)
print("DF Created for {} or {}".format(filepath, filename))
ref_filter = refdf.filter(col('SrcFileName') == filename)
for x in ref_filter.collect():
columnNames = x['SrcColumns']
refTypes = x['SrcColumnType']
#print(columnNames)
columnNamesList = [x.strip().lower() for x in columnNames.split(",")]
refTypesList = [x.strip().lower() for x in refTypes.split(",")]
#print(refTypesList)
dfsTypes = dfs.schema[columnNames].dataType.simpleString() #StringType() : string , IntergerType() : int
dfsTypesList = [x.strip().lower() for x in dfsTypes.split(",")]
# columnName : Row id, DataFrameType : int, reftype: int
missmatchedcolumns = [(col_name, df_types, ref_types) for (col_name, df_types, ref_types) in zip(columnNamesList, dfsTypesList, refTypesList) if dfsTypesList != refTypesList]
if missmatchedcolumns :
print("schema comparision has been failed or missmatched for this {}".format(filename))
for col_name, df_types, ref_types in missmatchedcolumns:
print(f"columnName : {col_name}, DataFrameType : {df_types}, referenceType : {ref_types}")
else:
print("Schema comaprision is done and success for {}".format(filename))
except Exception as e:
print("An error occured : ", str(e))
return False