Skip to content

Carrier bills

finalize_upload() async

Finalize the upload process. Once all batches are uploaded, merge them once with the existing data in LakeFS, store the result, and clear the accumulator.

Returns:

Name Type Description
message str

A message indicating the number of shipments processed.

lakefs_path str

The path to the merged data in LakeFS.

records_processed int

The number of records processed.

existing_records int

The number of existing records in LakeFS.

total_records int

The total number of records in LakeFS.

Source code in mycxo/boxtalk/routes/api/carrier_bills/route.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@carrier_bills_router.post("/finalize_upload")
async def finalize_upload():
	"""
	 Finalize the upload process.
	 Once all batches are uploaded, merge them once with the existing data in LakeFS, store the result, and clear the accumulator.

	Parameters:
	    None

	Returns:
	    message (str): A message indicating the number of shipments processed.
	    lakefs_path (str): The path to the merged data in LakeFS.
	    records_processed (int): The number of records processed.
	    existing_records (int): The number of existing records in LakeFS.
	    total_records (int): The total number of records in LakeFS.
	"""

	# id: int
	#  tracking_number: Optional[str] = None
	#  customer_name: Optional[str] = None
	#  order_number: Optional[str] = None
	#  shipstream_order_id: Optional[str] = None
	#  carrier: str
	#  shipping_method: Optional[str] = None
	#  shipping_date: Optional[date] = None
	#  shipping_cost: Optional[float] = None
	#  marked_up_cost: Optional[float] = None
	#  shipped_weight: Optional[str] = None
	#  billed_weight: Optional[str] = None
	#  warehouse: Optional[str] = None
	#  duties_taxes: int = 0
	#  zone: Optional[str] = None
	#  status: Optional[str] = None
	#  created_at: datetime
	#  updated_at: datetime
	try:
		global accumulator

		# Log the number of accumulated shipments
		print(f"Finalizing upload with {len(accumulator)} shipments...")

		# 1) Get existing data from LakeFS (if any)
		print("Fetching existing data from LakeFS...")
		existing_data = await get_carrier_bill_data_from_lakefs("carrier_bills_shipments")
		existing_count = len(existing_data) if not existing_data.empty else 0
		print(f"Found {existing_count} existing records in LakeFS.")

		# 2) Convert in-memory shipments (accumulator) to Parquet & merge
		print("Starting Parquet conversion...")
		parquet_buffer = io.BytesIO()
		convert_shipments_to_parquet(accumulator, parquet_buffer, existing_data)
		parquet_buffer.seek(0)  # Reset buffer position
		print("Successfully converted shipments to Parquet.")

		# 3) Save merged data to LakeFS
		print("Saving to LakeFS...")
		parquet_data = parquet_buffer.getvalue()
		path = await process_carrier_bill_data_for_lakefs(parquet_data, "carrier_bills_shipments")
		print(f"Saved merged data to LakeFS at path: {path}")

		total_new = len(accumulator)

		# 4) Clear the accumulator (we're done with these shipments)
		print("Clearing accumulator...")
		accumulator.clear()
		print("Accumulator cleared.")

		return {
			"message": "Shipments processed successfully",
			"lakefs_path": path,
			"records_processed": total_new,
			"existing_records": existing_count,
			"total_records": existing_count + total_new,
		}

	except ValidationError as e:
		print("Validation error:", e.json())
		raise HTTPException(status_code=422, detail=e.errors())
	except Exception as e:
		print("Unexpected error in finalize_upload:", str(e))
		traceback.print_exc()
		raise HTTPException(status_code=500, detail=str(e))

upload_shipments(data) async

Append the received shipments to our global in-memory list. No immediate merge with LakeFS — we do that once in finalize_upload.

Parameters:

Name Type Description Default
data ShipmentUpload

Shipment Upload Model

required
shipments List[Shipment]

The shipments to append to the accumulator.

required

Returns:

Name Type Description
message str

A message indicating the number of shipments received.

total_accumulated int

The total number of shipments accumulated so far.

Source code in mycxo/boxtalk/routes/api/carrier_bills/route.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@carrier_bills_router.post("/upload_shipments")
async def upload_shipments(data: ShipmentUpload):
	"""
	Append the received shipments to our global in-memory list.
	No immediate merge with LakeFS — we do that once in finalize_upload.

	Parameters:
	    data (ShipmentUpload): Shipment Upload Model
	    shipments (List[Shipment]): The shipments to append to the accumulator.

	Returns:
	    message (str): A message indicating the number of shipments received.
	    total_accumulated (int): The total number of shipments accumulated so far.
	"""
	try:
		global accumulator
		accumulator.extend(data.shipments)
		return {"message": f"Batch received with {len(data.shipments)} shipment(s).", "total_accumulated": len(accumulator)}
	except ValidationError as e:
		print("Validation error:", e.json())
		raise HTTPException(status_code=422, detail=e.errors())
	except Exception as e:
		print("Unexpected error in upload_shipments:", str(e))
		traceback.print_exc()
		raise HTTPException(status_code=500, detail=str(e))