Data Validation with Pyspark || Schema Comparison || Dynamically || Real Time Scenario

Поделиться
HTML-код
  • Опубликовано: 17 окт 2024
  • In this Video we covered how we can perform quick data validation like Schema comparison between source and Target: In the next video we will look into Date/TimeStamp format check and duplicate count check .
    Column Comparison link :
    • Data Validation with P...
    #dataanalytics #dataengineeringessentials #azuredatabricks
    #dataanalysis
    #pyspark
    #pythonprogramming
    #sql
    #databricks #PySpark #Spark #DatabricksNotebook #PySparkLogic

Комментарии • 10

  • @skateforlife3679
    @skateforlife3679 10 месяцев назад +2

    Thank you for your work !!! It would be amazing if you could enhance the video with "chapters" to put more context in what you explain in the differents sections of the video :) !

  • @saibhargavreddy5992
    @saibhargavreddy5992 5 месяцев назад

    I found this very useful as I had a similar issue with data validations. It helped a lot while completing my project.

  • @vamshimerugu6184
    @vamshimerugu6184 5 месяцев назад +1

    I think schema comparison is the important topic in pyspark . Great explanation sir ❤

  • @avinash7003
    @avinash7003 9 месяцев назад

    code in github?

    • @DataSpark45
      @DataSpark45  9 месяцев назад +1

      Here is the link bro : drive.google.com/drive/folders/1I6rqtiKh1ChM_dkLJwxfxyxcHWPgyiKZ?usp=sharing

  • @amandoshi5803
    @amandoshi5803 3 месяца назад

    source code ?

    • @DataSpark45
      @DataSpark45  3 месяца назад +1

      def SchemaComparision(controldf, spsession, refdf):
      try:
      #iterate controldf and get the filename and filepath
      for x in controldf.collect():
      filename = x['filename']
      #print(filename)
      filepath = x['filepath']
      #print(filepath)
      #define the dataframes from the filepaths
      print("Data frame is creating for {} or {}".format(filepath, filename))
      dfs = spsession.read.format('csv').option('header', True).option('inferSchema', True).load(filepath)
      print("DF Created for {} or {}".format(filepath, filename))
      ref_filter = refdf.filter(col('SrcFileName') == filename)
      for x in ref_filter.collect():
      columnNames = x['SrcColumns']
      refTypes = x['SrcColumnType']
      #print(columnNames)
      columnNamesList = [x.strip().lower() for x in columnNames.split(",")]
      refTypesList = [x.strip().lower() for x in refTypes.split(",")]
      #print(refTypesList)
      dfsTypes = dfs.schema[columnNames].dataType.simpleString() #StringType() : string , IntergerType() : int
      dfsTypesList = [x.strip().lower() for x in dfsTypes.split(",")]
      # columnName : Row id, DataFrameType : int, reftype: int
      missmatchedcolumns = [(col_name, df_types, ref_types) for (col_name, df_types, ref_types) in zip(columnNamesList, dfsTypesList, refTypesList) if dfsTypesList != refTypesList]
      if missmatchedcolumns :
      print("schema comparision has been failed or missmatched for this {}".format(filename))
      for col_name, df_types, ref_types in missmatchedcolumns:
      print(f"columnName : {col_name}, DataFrameType : {df_types}, referenceType : {ref_types}")

      else:
      print("Schema comaprision is done and success for {}".format(filename))
      except Exception as e:
      print("An error occured : ", str(e))
      return False