204 lines
6.4 KiB
Python
204 lines
6.4 KiB
Python
"""
|
|
Extract rowing machine time and distance from a display photo using Claude's vision API.
|
|
|
|
Mirrors the extraction logic from handle_rowing.go.
|
|
|
|
Usage:
|
|
python extract_rowing_data.py path/to/image.jpg
|
|
python extract_rowing_data.py --dir path/to/images/
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import mimetypes
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import anthropic
|
|
from PIL import Image
|
|
from PIL.ExifTags import Base as ExifBase
|
|
|
|
|
|
ALLOWED_MEDIA_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
|
|
|
|
PROMPT = """Look at this rowing machine display. Extract the total elapsed time and total distance.
|
|
|
|
Return ONLY a JSON object with these exact keys and numeric values:
|
|
- "timeMinutes": total minutes (e.g. 2:30 = 2)
|
|
- "timeSeconds": total seconds (e.g. 2:30 = 30)
|
|
- "distance": distance in meters as a number (e.g. 5000)
|
|
|
|
No text, no markdown, no explanation. Just the JSON object."""
|
|
|
|
# Validation bounds (same as Go handler)
|
|
MIN_DISTANCE = 100 # metres
|
|
MAX_DISTANCE = 100_000 # metres
|
|
MIN_TOTAL_SECS = 30 # 30 seconds
|
|
MAX_TOTAL_SECS = 7200 # 2 hours
|
|
MIN_PACE_PER_500M = 80 # ~1:20 /500m
|
|
MAX_PACE_PER_500M = 150 # ~2:30 /500m
|
|
|
|
|
|
def get_exif_date(image_path: str) -> str | None:
|
|
"""Extract the date taken from EXIF data."""
|
|
img = Image.open(image_path)
|
|
exif = img.getexif()
|
|
if not exif:
|
|
return None
|
|
return exif.get(ExifBase.DateTime)
|
|
|
|
|
|
def extract_rowing_data(image_path: str) -> dict:
|
|
"""
|
|
Send an image to Claude and extract rowing time/distance.
|
|
|
|
Returns dict with keys: timeMinutes, timeSeconds, distance, totalSeconds,
|
|
timePer500m, calories, dateTaken.
|
|
"""
|
|
path = Path(image_path)
|
|
media_type = mimetypes.guess_type(str(path))[0]
|
|
if media_type not in ALLOWED_MEDIA_TYPES:
|
|
raise ValueError(f"Unsupported image type: {media_type}")
|
|
|
|
encoded = base64.standard_b64encode(path.read_bytes()).decode("utf-8")
|
|
|
|
client = anthropic.Anthropic()
|
|
message = client.messages.create(
|
|
model="claude-haiku-4-5-20251001",
|
|
max_tokens=256,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "image",
|
|
"source": {
|
|
"type": "base64",
|
|
"media_type": media_type,
|
|
"data": encoded,
|
|
},
|
|
},
|
|
{
|
|
"type": "text",
|
|
"text": PROMPT,
|
|
},
|
|
],
|
|
}
|
|
],
|
|
)
|
|
|
|
if not message.content:
|
|
raise RuntimeError("Empty response from Claude")
|
|
|
|
raw = message.content[0].text.strip()
|
|
# Strip markdown fencing if present
|
|
raw = raw.removeprefix("```json").removeprefix("```").removesuffix("```").strip()
|
|
|
|
data = json.loads(raw)
|
|
time_minutes = int(data["timeMinutes"])
|
|
time_seconds = int(data["timeSeconds"])
|
|
distance = int(data["distance"])
|
|
|
|
if distance == 0:
|
|
raise ValueError("Invalid distance: 0")
|
|
|
|
total_seconds = time_minutes * 60 + time_seconds
|
|
|
|
# Validate
|
|
if not (MIN_DISTANCE <= distance <= MAX_DISTANCE):
|
|
raise ValueError(f"Anomalous distance: {distance}")
|
|
if not (MIN_TOTAL_SECS <= total_seconds <= MAX_TOTAL_SECS):
|
|
raise ValueError(f"Anomalous time: {total_seconds}s")
|
|
|
|
per_500m = total_seconds / distance * 500.0
|
|
if not (MIN_PACE_PER_500M <= per_500m <= MAX_PACE_PER_500M):
|
|
raise ValueError(f"Anomalous pace: {per_500m:.1f}s /500m")
|
|
|
|
calories = distance / 7500.0 * 500.0
|
|
|
|
date_taken = get_exif_date(image_path)
|
|
|
|
return {
|
|
"timeMinutes": time_minutes,
|
|
"timeSeconds": time_seconds,
|
|
"distance": distance,
|
|
"totalSeconds": total_seconds,
|
|
"timePer500m": round(per_500m, 2),
|
|
"calories": round(calories, 2),
|
|
"dateTaken": date_taken,
|
|
}
|
|
|
|
|
|
_VERSION_RE = re.compile(r"^(?P<base>.+?)(?:\s+\((?P<num>\d+)\))?(?P<ext>\.\w+)$")
|
|
|
|
|
|
def _keep_latest_versions(paths: list[Path]) -> list[Path]:
|
|
"""Keep only the highest-numbered version of each base filename.
|
|
|
|
e.g. given IMG_5454_screen.jpg, IMG_5454_screen (1).jpg, IMG_5454_screen (3).jpg
|
|
only IMG_5454_screen (3).jpg is kept. No-suffix counts as version 0.
|
|
"""
|
|
best: dict[str, tuple[int, Path]] = {}
|
|
for p in paths:
|
|
m = _VERSION_RE.match(p.name)
|
|
if not m:
|
|
best.setdefault(p.name, (0, p))
|
|
continue
|
|
base_key = m.group("base") + m.group("ext")
|
|
num = int(m.group("num")) if m.group("num") else 0
|
|
prev_num, _ = best.get(base_key, (-1, p))
|
|
if num > prev_num:
|
|
best[base_key] = (num, p)
|
|
return sorted(p for _, p in best.values())
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Extract rowing data from display photos")
|
|
parser.add_argument("--image", help="Path to a single image")
|
|
parser.add_argument("--dir", help="Path to a directory of images")
|
|
args = parser.parse_args()
|
|
|
|
if not args.image and not args.dir:
|
|
parser.error("Provide --image or --dir")
|
|
if args.image and args.dir:
|
|
parser.error("--image and --dir are mutually exclusive")
|
|
|
|
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
|
|
|
|
if args.dir:
|
|
dir_path = Path(args.dir)
|
|
all_images = sorted(
|
|
p for p in dir_path.iterdir() if p.suffix.lower() in IMAGE_EXTS
|
|
)
|
|
if not all_images:
|
|
print(f"No images found in {args.dir}")
|
|
sys.exit(1)
|
|
image_paths = _keep_latest_versions(all_images)
|
|
else:
|
|
image_paths = [Path(args.image)]
|
|
|
|
for img_path in image_paths:
|
|
print(f"\n {img_path.name}:")
|
|
try:
|
|
result = extract_rowing_data(str(img_path))
|
|
mins = result["timeMinutes"]
|
|
secs = result["timeSeconds"]
|
|
dist = result["distance"]
|
|
pace = result["timePer500m"]
|
|
pace_m = int(pace) // 60
|
|
pace_s = pace % 60
|
|
print(f" Time: {mins}:{secs:02d}")
|
|
print(f" Distance: {dist}m")
|
|
print(f" Pace: {pace_m}:{pace_s:04.1f} /500m")
|
|
print(f" Calories: {result['calories']:.0f}")
|
|
if result["dateTaken"]:
|
|
print(f" Date: {result['dateTaken']}")
|
|
except Exception as e:
|
|
print(f" ERROR: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|