From afa17914ae80c3756bd6a640a339d82af78f4ec2 Mon Sep 17 00:00:00 2001 From: sonali-shaw Date: Thu, 11 Jun 2026 10:28:34 -0400 Subject: [PATCH 1/3] Add Difference class with diff --- pfb_to_zip/pfb_to_zip.py | 88 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 8 deletions(-) diff --git a/pfb_to_zip/pfb_to_zip.py b/pfb_to_zip/pfb_to_zip.py index 97a45ec..753f907 100644 --- a/pfb_to_zip/pfb_to_zip.py +++ b/pfb_to_zip/pfb_to_zip.py @@ -15,6 +15,7 @@ from pfb.reader import PFBReader from pfb.exporters import tsv from dictionaryutils.utils import node_values_to_codes +from pfb.writer import PFBWriter def to_folder_name(value: str) -> str: @@ -28,8 +29,6 @@ class Config(): def __init__(self): self.reader = None - - class PFBExporter: def __init__(self, pfb_file_path:str, tmp_folder:str, output_path:str, config_file_path:str, ontology:str=None, extra_analysis:str=None) -> None: self.pfb_file_path = pfb_file_path @@ -77,7 +76,6 @@ def __new__(cls, *args, **kwargs): if cls._validate(*args, **kwargs): return super().__new__(cls) - def initialize(self): print("Initializing...") @@ -316,7 +314,6 @@ def zip(self): self.zip_file_output_path = make_archive(output_filename, 'zip', self.tmp_folder, self.zip_subfolder) print(f"ZIP file created at {self.zip_file_output_path}") - def clean_up(self): print(f"Removing all the resources used and the temporary data directory at {self.tmp_folder}") @@ -335,11 +332,83 @@ def clean_up(self): print(e) +class Difference: + def __init__(self, old_file_path: str, new_file_path: str): + self.old_file = old_file_path + self.new_file = new_file_path + + def read_avro(self, file_path): + records_by_submitter = {} + submitter_ids_list = [] + + with open(file_path, 'rb') as f: + with PFBReader(f) as reader: + for record in reader: + submitter_id = record['object'].get('submitter_id') + if submitter_id: # program etc. don't have submitter_id + submitter_ids_list.append(submitter_id) + records_by_submitter[submitter_id] = record + + # duplicates check + submitter_ids_set = set(submitter_ids_list) + if len(submitter_ids_list) != len(submitter_ids_set): + print("⚠️ Duplicates found!") + + return records_by_submitter + + def generate_diff(self, output_path: str): + old = self.read_avro(self.old_file) + new = self.read_avro(self.new_file) + + deleted_submitter_ids = set(old.keys()) - set(new.keys()) + diff_records = [] + log_lines = [] + + if deleted_submitter_ids: + msg = f" {len(deleted_submitter_ids)} record(s) removed (consent withdrawn):" + print(msg) + log_lines.append(msg) + for sid in sorted(deleted_submitter_ids): + line = f" - {sid}" + print(line) + log_lines.append(line) + else: + msg = "No records removed." + print(msg) + log_lines.append(msg) + + for submitter_id, new_record in new.items(): + if submitter_id not in old: + diff_records.append(new_record) + else: + if new_record['object'] != old[submitter_id]['object']: + diff_records.append(new_record) + + summary = f"\n Total changed/new records in diff: {len(diff_records)}" + print(summary) + log_lines.append(summary) + + # Save log to markdown + log_path = output_path.replace('.avro', '_log.md') + with open(log_path, 'w') as f: + f.write("# Diff Log\n\n") + f.write("\n".join(log_lines)) + print(f"Log saved to {log_path}") + + with open(output_path, 'wb') as out_f: + with PFBWriter(out_f) as writer: + with open(self.new_file, 'rb') as schema_f: + with PFBReader(schema_f) as reader: + writer.copy_schema(reader) + writer.write(diff_records) + + print(f"Written to {output_path} with {len(diff_records)} records") def main(): # EXAMPLE: python pfb_to_zip.py -i ./export_2023-03-27T02_42_17.avro -o ./outputs/ -c ./config.py -d https://portal.pedscommons.org/api/v0/submission/_dictionary/_all -t ncit + # Example for diff: python3 pfb_to_zip.py -i ../new_data.avro -o ../outputs/ -c ./config.py -d ../old_data.avro parser = argparse.ArgumentParser(description="Build ZIP bundle for data delivery after project request has been approved") parser.add_argument('-c', '--config', help='The config file') @@ -347,6 +416,7 @@ def main(): parser.add_argument('-o', '--output', help='Output ZIP directory') parser.add_argument('-t', '--terminology', help='The ontology you want to transform GEN3 values to.') parser.add_argument('-a', '--analysis', help='The consorti script you want to execute. Ex:INRG') + parser.add_argument('-d', '--diff', help='Path to the old avro file to diff against') try: args = parser.parse_args() @@ -359,6 +429,11 @@ def main(): print(err) parser.print_help() + if args.diff: + diff = Difference(args.diff, input_path) + diff_output = output_path + 'diff.avro' + diff.generate_diff(diff_output) + input_path = diff_output tmp_folder = "./tmp" pfb_export = PFBExporter(input_path, tmp_folder, output_path, config_file, ontology, True if analysis_script_consortia and analysis_script_consortia != "" else False) @@ -371,14 +446,11 @@ def main(): pfb_export.filter_attributes(is_black_list=False) # pfb_export.filter_attributes(is_black_list=True) pfb_export.setup_and_run_analysis(analysis_script_consortia) #TODO how to find consortium - pfb_export.to_ontology_code() + pfb_export.to_ontology_code() pfb_export.add_external_references_material() pfb_export.zip() pfb_export.clean_up() - - - if __name__ == "__main__": main() From 6ef8ad83771cf9c310a0a7e460bd3b64a40888ed Mon Sep 17 00:00:00 2001 From: sonali-shaw Date: Tue, 16 Jun 2026 10:27:53 -0500 Subject: [PATCH 2/3] revised diff --- pfb_to_zip/pfb_to_zip.py | 127 +++++++++++++++++++++++++++++++-------- 1 file changed, 101 insertions(+), 26 deletions(-) diff --git a/pfb_to_zip/pfb_to_zip.py b/pfb_to_zip/pfb_to_zip.py index 753f907..8b57aeb 100644 --- a/pfb_to_zip/pfb_to_zip.py +++ b/pfb_to_zip/pfb_to_zip.py @@ -338,62 +338,85 @@ def __init__(self, old_file_path: str, new_file_path: str): self.new_file = new_file_path def read_avro(self, file_path): - records_by_submitter = {} - submitter_ids_list = [] + records_by_subject = {} with open(file_path, 'rb') as f: with PFBReader(f) as reader: for record in reader: - submitter_id = record['object'].get('submitter_id') - if submitter_id: # program etc. don't have submitter_id - submitter_ids_list.append(submitter_id) - records_by_submitter[submitter_id] = record + obj = record['object'] + name = record['name'] - # duplicates check - submitter_ids_set = set(submitter_ids_list) - if len(submitter_ids_list) != len(submitter_ids_set): - print("⚠️ Duplicates found!") - - return records_by_submitter + if name == 'subject': + sid = obj.get('submitter_id') + else: + sid = obj.get('subjects.submitter_id') + + if sid: + if sid not in records_by_subject: + records_by_subject[sid] = {} + if name not in records_by_subject[sid]: + records_by_subject[sid][name] = [] + records_by_subject[sid][name].append(record) + + return records_by_subject + + # removes created and updated times from data for comparison + def _strip_timestamps(self, nodes): + stripped = {} + for node_name, records in nodes.items(): + stripped[node_name] = [] + for r in records: + stripped_record = r.copy() + obj = r['object'].copy() + obj.pop('created_datetime', None) + obj.pop('updated_datetime', None) + stripped_record['object'] = obj + stripped[node_name].append(stripped_record) + return stripped def generate_diff(self, output_path: str): old = self.read_avro(self.old_file) new = self.read_avro(self.new_file) - deleted_submitter_ids = set(old.keys()) - set(new.keys()) + deleted_sids = set(old.keys()) - set(new.keys()) diff_records = [] log_lines = [] - if deleted_submitter_ids: - msg = f" {len(deleted_submitter_ids)} record(s) removed (consent withdrawn):" + if deleted_sids: + msg = f"{len(deleted_sids)} subject(s) removed:" print(msg) log_lines.append(msg) - for sid in sorted(deleted_submitter_ids): + for sid in sorted(deleted_sids): line = f" - {sid}" print(line) log_lines.append(line) + for records in old[sid].values(): + for record in records: + line = f" - {record['name']}: {record['object']}" + print(line) + log_lines.append(line) else: - msg = "No records removed." + msg = "No subjects removed." print(msg) log_lines.append(msg) - for submitter_id, new_record in new.items(): - if submitter_id not in old: - diff_records.append(new_record) + for sid, new_nodes in new.items(): + if sid not in old: + for records in new_nodes.values(): + diff_records.extend(records) else: - if new_record['object'] != old[submitter_id]['object']: - diff_records.append(new_record) + if self._strip_timestamps(new_nodes) != self._strip_timestamps(old[sid]): + for records in new_nodes.values(): + diff_records.extend(records) - summary = f"\n Total changed/new records in diff: {len(diff_records)}" + summary = f"Total changed/new records in diff: {len(diff_records)}" print(summary) log_lines.append(summary) - # Save log to markdown log_path = output_path.replace('.avro', '_log.md') with open(log_path, 'w') as f: f.write("# Diff Log\n\n") f.write("\n".join(log_lines)) - print(f"Log saved to {log_path}") with open(output_path, 'wb') as out_f: with PFBWriter(out_f) as writer: @@ -404,6 +427,57 @@ def generate_diff(self, output_path: str): print(f"Written to {output_path} with {len(diff_records)} records") + def create_test_diff_avro(self, output_path: str): + import copy + + all_records = [] + modified_count = 0 + removed_sids = set() + + # Read all records and group by subject + records_by_subject = self.read_avro(self.old_file) + all_sids = list(records_by_subject.keys()) + + # Pick subjects to modify, remove + sids_to_modify = all_sids[:3] # modify first 3 subjects + sids_to_remove = all_sids[3:5] # remove next 2 subjects (simulate consent withdrawal) + + with open(self.old_file, 'rb') as f: + with PFBReader(f) as reader: + for record in reader: + obj = record['object'] + name = record['name'] + + # find which subject this record belongs to + if name == 'subject': + sid = obj.get('submitter_id') + else: + sid = obj.get('subjects.submitter_id') + + # skip removed subjects entirely + if sid in sids_to_remove: + continue + + new_record = copy.deepcopy(record) + + # modify records belonging to selected subjects + if sid in sids_to_modify: + if name == 'subject': + new_record['object']['age_at_censor_status'] = 99.99 + modified_count += 1 + + all_records.append(new_record) + + with open(output_path, 'wb') as out_f: + with PFBWriter(out_f) as writer: + with open(self.old_file, 'rb') as schema_f: + with PFBReader(schema_f) as reader: + writer.copy_schema(reader) + writer.write(all_records) + + print(f"Written to {output_path}") + print(f" - {len(sids_to_modify)} subjects modified") + print(f" - {len(sids_to_remove)} subjects removed: {sorted(sids_to_remove)}") def main(): @@ -454,5 +528,6 @@ def main(): if __name__ == "__main__": main() - + # d = Difference('../ex1.avro', '../ex1.avro') + # d.create_test_diff_avro('../ex2.avro') From 876a3c701ff97bf472df0096961b4cce9ff6ea2b Mon Sep 17 00:00:00 2001 From: sonali-shaw Date: Tue, 30 Jun 2026 12:45:06 -0500 Subject: [PATCH 3/3] Add normalize method for sorting before comparison --- pfb_to_zip/pfb_to_zip.py | 67 +++++++--------------------------------- 1 file changed, 11 insertions(+), 56 deletions(-) diff --git a/pfb_to_zip/pfb_to_zip.py b/pfb_to_zip/pfb_to_zip.py index 8b57aeb..f12ba16 100644 --- a/pfb_to_zip/pfb_to_zip.py +++ b/pfb_to_zip/pfb_to_zip.py @@ -6,6 +6,7 @@ import re import subprocess import csv +import json from shutil import rmtree, make_archive, copy, copytree import requests @@ -374,6 +375,15 @@ def _strip_timestamps(self, nodes): stripped[node_name].append(stripped_record) return stripped + def _normalize(self, nodes): + normalized = self._strip_timestamps(nodes) + for node_name in normalized: + normalized[node_name] = sorted( + normalized[node_name], + key=lambda r: json.dumps(r, sort_keys=True) + ) + return normalized + def generate_diff(self, output_path: str): old = self.read_avro(self.old_file) new = self.read_avro(self.new_file) @@ -405,7 +415,7 @@ def generate_diff(self, output_path: str): for records in new_nodes.values(): diff_records.extend(records) else: - if self._strip_timestamps(new_nodes) != self._strip_timestamps(old[sid]): + if self._normalize(new_nodes) != self._normalize(old[sid]): for records in new_nodes.values(): diff_records.extend(records) @@ -427,59 +437,6 @@ def generate_diff(self, output_path: str): print(f"Written to {output_path} with {len(diff_records)} records") - def create_test_diff_avro(self, output_path: str): - import copy - - all_records = [] - modified_count = 0 - removed_sids = set() - - # Read all records and group by subject - records_by_subject = self.read_avro(self.old_file) - all_sids = list(records_by_subject.keys()) - - # Pick subjects to modify, remove - sids_to_modify = all_sids[:3] # modify first 3 subjects - sids_to_remove = all_sids[3:5] # remove next 2 subjects (simulate consent withdrawal) - - with open(self.old_file, 'rb') as f: - with PFBReader(f) as reader: - for record in reader: - obj = record['object'] - name = record['name'] - - # find which subject this record belongs to - if name == 'subject': - sid = obj.get('submitter_id') - else: - sid = obj.get('subjects.submitter_id') - - # skip removed subjects entirely - if sid in sids_to_remove: - continue - - new_record = copy.deepcopy(record) - - # modify records belonging to selected subjects - if sid in sids_to_modify: - if name == 'subject': - new_record['object']['age_at_censor_status'] = 99.99 - modified_count += 1 - - all_records.append(new_record) - - with open(output_path, 'wb') as out_f: - with PFBWriter(out_f) as writer: - with open(self.old_file, 'rb') as schema_f: - with PFBReader(schema_f) as reader: - writer.copy_schema(reader) - writer.write(all_records) - - print(f"Written to {output_path}") - print(f" - {len(sids_to_modify)} subjects modified") - print(f" - {len(sids_to_remove)} subjects removed: {sorted(sids_to_remove)}") - - def main(): # EXAMPLE: python pfb_to_zip.py -i ./export_2023-03-27T02_42_17.avro -o ./outputs/ -c ./config.py -d https://portal.pedscommons.org/api/v0/submission/_dictionary/_all -t ncit # Example for diff: python3 pfb_to_zip.py -i ../new_data.avro -o ../outputs/ -c ./config.py -d ../old_data.avro @@ -528,6 +485,4 @@ def main(): if __name__ == "__main__": main() - # d = Difference('../ex1.avro', '../ex1.avro') - # d.create_test_diff_avro('../ex2.avro')