# @replace DATA_SOURCES
DATA_SOURCES = {"datagrepper-topics": "/home/jovyan/work/bus2parquet/output_parquets"}
parquet_dir = DATA_SOURCES["datagrepper-topics"]
topic = "org.fedoraproject.prod.discourse.topic.topic_created"
cutoff_date = (pd.Timestamp.now().replace(day=1) - pd.DateOffset(months=12)).date()
files = []
for p in Path(f"{parquet_dir}/{topic}").glob("fedora-*.parquet"):
stem = p.stem.replace(f"-{topic}", "")
d = datetime.strptime(stem.split("-")[1], "%Y%m%d").date()
# if d >= cutoff_date and os.path.getsize(p) > 0:
if os.path.getsize(p) > 0:
files.append(str(p))
local_fs = fs.LocalFileSystem()
tables = []
# First pass: collect all schemas
all_fields = {}
for f in files:
try:
tbl = pq.read_table(f)
for name in tbl.schema.names:
all_fields[name] = pa.string() # force everything to string
except Exception as e:
print(f"[WARN] Skipping {f}: {e}")
# Build unified schema
unified_schema = pa.schema([pa.field(name, pa.string()) for name in sorted(all_fields)])
# Second pass: cast each table to unified schema
for f in files:
try:
tbl = pq.read_table(f)
# Cast existing columns to string
casted = {}
for name in tbl.schema.names:
col = tbl[name].cast(pa.string())
casted[name] = col
# Add missing columns as null strings
for name in unified_schema.names:
if name not in casted:
casted[name] = pa.array([None] * len(tbl), type=pa.string())
# Build new table with unified schema
new_tbl = pa.table([casted[name] for name in unified_schema.names], schema=unified_schema)
tables.append(new_tbl)
except Exception as e:
print(f"[WARN] Skipping {f}: {e}")
if tables:
table = pa.concat_tables(tables, promote=True)
df = table.to_pandas()
print(f"Loaded {len(df)} records from {len(tables)} tables.")
else:
print("No valid parquet files found")