diff --git a/.env.example b/.env.example index 468461a..0f6cd6f 100644 --- a/.env.example +++ b/.env.example @@ -252,6 +252,24 @@ DATABASE_URL=postgres://user:password@localhost:5432/boost_dashboard # Path to context repository (where markdown files are exported) # DISCORD_CONTEXT_REPO_PATH=F:\boost\discord-cplusplus-together-context +# Optional: upload exported markdown via GitHub API (same pattern as BOOST_LIBRARY_TRACKER_REPO_*) +# DISCORD_MARKDOWN_REPO_OWNER=your-org +# DISCORD_MARKDOWN_REPO_NAME=your-discord-md-repo +# DISCORD_MARKDOWN_REPO_BRANCH=main + +# Per-day exporter (run_discord_channel_export): optional override for script directory; default uses bundled offline_scripts/ +# DISCORD_EXPORT_SCRIPT_DIR= +# DISCORD_CHANNEL_EXPORT_NAMES=Discussion - c-cpp-discussion +# DISCORD_EXPORT_TIMEZONE=America/New_York +# DISCORD_EXPORT_CHUNK_DAYS=1 + +# Pinecone (run_discord_exporter after markdown export) +# DISCORD_PINECONE_APP_TYPE=discord-together-cpp +# DISCORD_PINECONE_NAMESPACE=discord-cplusplus + +# DiscordChatExporter version (Tyrrrz/DiscordChatExporter releases); zip download + git clone +# DISCORD_CHAT_EXPORTER_VERSION=2.47 + # ============================================================================= # YouTube (cppa_youtube_script_tracker) # ============================================================================= diff --git a/.gitignore b/.gitignore index d9be1a3..2e680a9 100644 --- a/.gitignore +++ b/.gitignore @@ -41,8 +41,9 @@ celerybeat.pid *.swo .cursor/ -# DiscordChatExporter CLI (download to workspace/discord_activity_tracker/tools/) +# DiscordChatExporter CLI / source build (workspace/discord_activity_tracker/) discord_activity_tracker/tools/ +discord_activity_tracker/vendor/ # macOS .DS_Store ._* diff --git a/config/settings.py b/config/settings.py index cecb131..fad1939 100644 --- a/config/settings.py +++ b/config/settings.py @@ -453,6 +453,40 @@ def _slack_team_scope_from_env(): ) ).resolve() +# Markdown upload (GitHub API) — optional; same pattern as BOOST_LIBRARY_TRACKER_REPO_* +DISCORD_MARKDOWN_REPO_OWNER = ( + env("DISCORD_MARKDOWN_REPO_OWNER", default="") or "" +).strip() +DISCORD_MARKDOWN_REPO_NAME = ( + env("DISCORD_MARKDOWN_REPO_NAME", default="") or "" +).strip() +DISCORD_MARKDOWN_REPO_BRANCH = ( + env("DISCORD_MARKDOWN_REPO_BRANCH", default="main") or "main" +).strip() + +# Per-day export script (export_guild_by_day.py) — defaults match workspace/script +DISCORD_EXPORT_SCRIPT_DIR = ( + env("DISCORD_EXPORT_SCRIPT_DIR", default="") or "" +).strip() # empty = package offline_scripts/ +DISCORD_CHANNEL_EXPORT_NAMES = ( + env("DISCORD_CHANNEL_EXPORT_NAMES", default="Discussion - c-cpp-discussion") or "" +).strip() +DISCORD_EXPORT_TIMEZONE = ( + env("DISCORD_EXPORT_TIMEZONE", default="America/New_York") or "America/New_York" +).strip() +DISCORD_EXPORT_CHUNK_DAYS = int(env("DISCORD_EXPORT_CHUNK_DAYS", default="1") or "1") + +# Pinned DiscordChatExporter release (Tyrrrz/DiscordChatExporter); zip + git clone use this tag +DISCORD_CHAT_EXPORTER_VERSION = ( + env("DISCORD_CHAT_EXPORTER_VERSION", default="2.47") or "2.47" +).strip() + +# Pinecone (cppa_pinecone_sync) for Discord messages +DISCORD_PINECONE_APP_TYPE = (env("DISCORD_PINECONE_APP_TYPE", default="") or "").strip() +DISCORD_PINECONE_NAMESPACE = ( + env("DISCORD_PINECONE_NAMESPACE", default="") or "" +).strip() + # WG21 Paper Tracker Configuration WG21_GITHUB_DISPATCH_ENABLED = env.bool("WG21_GITHUB_DISPATCH_ENABLED", default=False) WG21_GITHUB_DISPATCH_REPO = (env("WG21_GITHUB_DISPATCH_REPO", default="") or "").strip() diff --git a/config/test_settings.py b/config/test_settings.py index c1c3f83..d3581a2 100644 --- a/config/test_settings.py +++ b/config/test_settings.py @@ -69,3 +69,11 @@ CLANG_GITHUB_CONTEXT_REPO_OWNER = "" CLANG_GITHUB_CONTEXT_REPO_NAME = "" CLANG_GITHUB_CONTEXT_REPO_BRANCH = "" + +# Discord: no real markdown repo / Pinecone in tests +DISCORD_MARKDOWN_REPO_OWNER = "" +DISCORD_MARKDOWN_REPO_NAME = "" +DISCORD_PINECONE_APP_TYPE = "" +DISCORD_PINECONE_NAMESPACE = "" + +DISCORD_CHAT_EXPORTER_VERSION = "2.47" diff --git a/discord_activity_tracker/.gitignore b/discord_activity_tracker/.gitignore new file mode 100644 index 0000000..d193b47 --- /dev/null +++ b/discord_activity_tracker/.gitignore @@ -0,0 +1,6 @@ +# Built / downloaded DiscordChatExporter CLI (also listed in repo root .gitignore) +tools/ +vendor/ +# Local overrides (tokens, paths) — never commit secrets +*.local +.env.local diff --git a/discord_activity_tracker/github_publish.py b/discord_activity_tracker/github_publish.py new file mode 100644 index 0000000..aabcbe4 --- /dev/null +++ b/discord_activity_tracker/github_publish.py @@ -0,0 +1,70 @@ +"""Upload Discord markdown exports to a GitHub repo (API upload, no local git required).""" + +from __future__ import annotations + +import logging +from pathlib import Path + +from django.conf import settings + +from github_ops import get_github_token, upload_folder_to_github + +logger = logging.getLogger(__name__) + +DEFAULT_BRANCH = "main" + + +def discord_markdown_repo_config() -> tuple[str, str, str] | None: + """Return (owner, repo, branch) for Markdown upload, or None if not configured.""" + owner = getattr(settings, "DISCORD_MARKDOWN_REPO_OWNER", "") or "" + repo = getattr(settings, "DISCORD_MARKDOWN_REPO_NAME", "") or "" + branch = ( + getattr(settings, "DISCORD_MARKDOWN_REPO_BRANCH", DEFAULT_BRANCH) + or DEFAULT_BRANCH + ).strip() + owner = owner.strip() + repo = repo.strip() + if not owner or not repo: + return None + return owner, repo, branch + + +def push_discord_markdown_to_github(local_folder: Path) -> bool: + """ + Upload all files under local_folder to DISCORD_MARKDOWN_REPO_*. + Returns True on reported API success. + """ + cfg = discord_markdown_repo_config() + if not cfg: + logger.error( + "DISCORD_MARKDOWN_REPO_OWNER / DISCORD_MARKDOWN_REPO_NAME not set; " + "skipping upload." + ) + return False + owner, repo, branch = cfg + if not local_folder.is_dir(): + logger.error("Markdown folder is not a directory: %s", local_folder) + return False + + logger.info( + "Uploading Discord markdown from %s to %s/%s@%s", + local_folder, + owner, + repo, + branch, + ) + token = get_github_token(use="write") + result = upload_folder_to_github( + local_folder=local_folder, + owner=owner, + repo=repo, + commit_message="chore: update Discord archive markdown", + branch=branch, + token=token, + ) + if result.get("success"): + logger.info("Discord markdown upload complete") + return True + msg = result.get("message") or "Upload failed" + logger.error("Discord markdown upload failed: %s", msg) + return False diff --git a/discord_activity_tracker/management/commands/backfill_discord_json.py b/discord_activity_tracker/management/commands/backfill_discord_json.py new file mode 100644 index 0000000..522adab --- /dev/null +++ b/discord_activity_tracker/management/commands/backfill_discord_json.py @@ -0,0 +1,151 @@ +"""Import historical DiscordChatExporter JSON (per-day / chunk layout) into the database.""" + +from __future__ import annotations + +import asyncio +import logging +from pathlib import Path + +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError + +from core.utils.datetime_parsing import parse_iso_datetime +from discord_activity_tracker.sync.backfill_paths import ( + iter_discussion_json_files, + json_path_in_date_window, +) +from discord_activity_tracker.sync.chat_exporter import parse_exported_json +from discord_activity_tracker.sync.importer import persist_exporter_channel_payloads +from discord_activity_tracker.workspace import get_discussion_export_dir + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = ( + "Backfill Discord messages from DiscordChatExporter JSON files (e.g. " + "workspace/discord_activity_tracker/Discussion - c-cpp-discussion/). " + "Processes one file at a time. Use --dry-run to list files only." + ) + + def add_arguments(self, parser): + parser.add_argument( + "--path", + type=str, + default=None, + help=( + "Root directory to scan (default: workspace/discord_activity_tracker/" + "Discussion - c-cpp-discussion/)." + ), + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="List files that would be imported; do not write to the database.", + ) + parser.add_argument( + "--limit", + type=int, + default=0, + metavar="N", + help="Process at most N JSON files (0 = no limit).", + ) + parser.add_argument( + "--guild-id", + type=int, + default=None, + help="If set, skip JSON whose guild id does not match (default: DISCORD_SERVER_ID).", + ) + parser.add_argument( + "--since", + "--from-date", + "--start-time", + type=str, + default=None, + dest="since", + help="Only include files whose day/chunk range overlaps this date (YYYY-MM-DD or ISO). " + "Aliases: --from-date, --start-time.", + ) + parser.add_argument( + "--until", + "--to-date", + "--end-time", + type=str, + default=None, + dest="until", + help="Only include files up to this date (same formats as --since). " + "Aliases: --to-date, --end-time.", + ) + + def handle(self, *args, **options): + dry_run = options["dry_run"] + limit = max(0, int(options["limit"] or 0)) + path_arg = options["path"] + root = ( + Path(path_arg).expanduser().resolve() + if path_arg + else get_discussion_export_dir() + ) + + try: + since_dt = parse_iso_datetime(options.get("since")) + until_dt = parse_iso_datetime(options.get("until")) + except ValueError as e: + raise CommandError(str(e)) from e + if since_dt and until_dt and since_dt > until_dt: + raise CommandError("since must be on or before until") + + cfg_guild = (getattr(settings, "DISCORD_SERVER_ID", "") or "").strip() + expected = options["guild_id"] + if expected is None and cfg_guild: + try: + expected = int(cfg_guild) + except ValueError: + expected = None + + if not root.is_dir(): + raise CommandError(f"Not a directory: {root}") + + paths = list(iter_discussion_json_files(root)) + paths = [p for p in paths if json_path_in_date_window(p, since_dt, until_dt)] + if limit: + paths = paths[:limit] + + self.stdout.write(f"Found {len(paths)} JSON file(s) under {root}") + if not paths: + return + + if dry_run: + for p in paths: + self.stdout.write(f" {p.relative_to(root)}") + self.stdout.write(self.style.WARNING("DRY RUN — no database writes")) + return + + processed = 0 + for i, json_path in enumerate(paths, 1): + try: + data = parse_exported_json(json_path) + channel_data = { + "guild": data.get("guild", {}), + "channel": data.get("channel", {}), + "messages": data.get("messages", []), + } + ch = channel_data["channel"].get("name", "?") + n = len(channel_data["messages"]) + self.stdout.write( + f" [{i}/{len(paths)}] {json_path.name} #{ch}: {n} msgs" + ) + asyncio.run( + persist_exporter_channel_payloads( + [channel_data], + expected_guild_id=expected, + ) + ) + processed += 1 + except Exception as e: + logger.exception("Backfill failed for %s: %s", json_path, e) + self.stdout.write(self.style.WARNING(f" Skip {json_path.name}: {e}")) + + self.stdout.write( + self.style.SUCCESS(f"✓ Backfill finished ({processed} file(s))") + ) diff --git a/discord_activity_tracker/management/commands/debug_discord_export.py b/discord_activity_tracker/management/commands/debug_discord_export.py index c7ae80e..09a8193 100644 --- a/discord_activity_tracker/management/commands/debug_discord_export.py +++ b/discord_activity_tracker/management/commands/debug_discord_export.py @@ -7,19 +7,21 @@ class Command(BaseCommand): - help = "Inspect reply links and exported markdown" + help = "Inspect reply links and raw markdown preview for Discord messages stored in the database." def add_arguments(self, parser): parser.add_argument( "--message-id", type=int, - help="Inspect a specific message by message_id (Discord message ID)", + metavar="ID", + help="Inspect a specific message by Discord snowflake message_id.", ) parser.add_argument( "--limit", type=int, default=5, - help="Number of reply messages to show (default: 5)", + metavar="N", + help="When listing replies, show at most N rows (default: 5).", ) def handle(self, *args, **options): diff --git a/discord_activity_tracker/management/commands/run_discord_activity_tracker.py b/discord_activity_tracker/management/commands/run_discord_activity_tracker.py index d20ce74..14f8dfa 100644 --- a/discord_activity_tracker/management/commands/run_discord_activity_tracker.py +++ b/discord_activity_tracker/management/commands/run_discord_activity_tracker.py @@ -1,65 +1,114 @@ -"""Django management command - sync messages and export to markdown.""" +"""Django management command - sync messages and export to markdown (Discord bot token).""" import logging from pathlib import Path -from django.core.management.base import BaseCommand from django.conf import settings +from django.core.management.base import BaseCommand, CommandError -from discord_activity_tracker.models import DiscordServer, DiscordChannel -from discord_activity_tracker.sync.messages import sync_all_channels +from core.utils.datetime_parsing import parse_iso_datetime +from discord_activity_tracker.models import DiscordChannel, DiscordServer from discord_activity_tracker.sync.export import export_and_push +from discord_activity_tracker.sync.messages import sync_all_channels logger = logging.getLogger(__name__) class Command(BaseCommand): - help = "Run Discord Activity Tracker: sync messages and export to markdown" + help = ( + "Run Discord Activity Tracker: sync messages (Discord API / bot token) and export to markdown. " + "Use --since/--until to bound the API sync window when supported. " + "Markdown export still uses active channel filters (months / active-days)." + ) def add_arguments(self, parser): parser.add_argument( "--dry-run", action="store_true", - help="Preview actions without executing them", + help="Preview actions without executing them (no database writes).", ) parser.add_argument( "--task", type=str, - default=None, - help="Task to run: sync, export, or all (default: all)", + default="all", + choices=["sync", "export", "all"], + help="Run sync, export markdown, or both (default: all).", + ) + parser.add_argument( + "--skip-sync", + action="store_true", + help="Skip Discord API message sync (with --task all or sync).", + ) + parser.add_argument( + "--skip-markdown-export", + action="store_true", + help="Skip markdown export (with --task all or export).", ) parser.add_argument( "--full-sync", action="store_true", - help="Sync all messages (ignore last_synced_at)", + help="Sync all messages (ignore last_synced_at).", ) parser.add_argument( "--months", type=int, default=12, - help="Number of months to export (default: 12)", + help="Number of months to export (default: 12).", ) parser.add_argument( "--active-days", type=int, default=30, - help="Number of days to consider a channel active (default: 30)", + help="Number of days to consider a channel active (default: 30).", + ) + parser.add_argument( + "--since", + "--from-date", + "--start-time", + type=str, + default=None, + dest="since", + help="Lower bound for message sync (YYYY-MM-DD or ISO-8601). Passed to sync_all_channels. " + "--from-date and --start-time are deprecated aliases for --since.", + ) + parser.add_argument( + "--until", + "--to-date", + "--end-time", + type=str, + default=None, + dest="until", + help="Upper bound for message sync (same formats as --since). " + "Note: the Discord client currently fetches forward from the sync cursor; " + "this value is passed for API consistency where supported. " + "--to-date and --end-time are deprecated aliases for --until.", ) def handle(self, *args, **options): dry_run = options["dry_run"] - task_filter = (options["task"] or "").strip().lower() + task = options["task"] + skip_sync = options["skip_sync"] + skip_markdown_export = options["skip_markdown_export"] full_sync = options["full_sync"] months = options["months"] active_days = options["active_days"] try: - # Get settings + since_dt = parse_iso_datetime(options.get("since")) + until_dt = parse_iso_datetime(options.get("until")) + except ValueError as e: + raise CommandError(str(e)) from e + if since_dt and until_dt and since_dt > until_dt: + logger.warning( + "Invalid date range: since after until; ignoring both for sync window" + ) + since_dt = until_dt = None + + try: token = getattr(settings, "DISCORD_TOKEN", None) guild_id = getattr(settings, "DISCORD_SERVER_ID", None) context_repo_path = getattr(settings, "DISCORD_CONTEXT_REPO_PATH", None) - # Validate settings if not token: self.stdout.write(self.style.ERROR("DISCORD_TOKEN not configured")) return @@ -68,26 +117,35 @@ def handle(self, *args, **options): self.stdout.write(self.style.ERROR("DISCORD_SERVER_ID not configured")) return - if not context_repo_path: + markdown_wanted = (task in ("export", "all")) and not skip_markdown_export + if markdown_wanted and not context_repo_path: self.stdout.write( self.style.ERROR("DISCORD_CONTEXT_REPO_PATH not configured") ) return - context_repo_path = Path(context_repo_path) + context_repo_path = ( + Path(context_repo_path).resolve() if context_repo_path else None + ) + guild_id = int(guild_id) + + run_sync = task in ("sync", "all") and not skip_sync + if skip_sync and task in ("sync", "all"): + self.stdout.write( + self.style.WARNING("--skip-sync: skipping Discord API message sync") + ) - # Task 1: Sync Discord messages - if not task_filter or task_filter == "sync" or task_filter == "all": + if run_sync: self._task_sync_messages( dry_run=dry_run, token=token, guild_id=guild_id, full_sync=full_sync, active_days=active_days, + since_date=since_dt, ) - # Task 2: Export to markdown - if not task_filter or task_filter == "export" or task_filter == "all": + if markdown_wanted: self._task_export_markdown( dry_run=dry_run, guild_id=guild_id, @@ -95,6 +153,12 @@ def handle(self, *args, **options): months=months, active_days=active_days, ) + elif task in ("export", "all") and skip_markdown_export: + self.stdout.write( + self.style.WARNING( + "--skip-markdown-export: skipping markdown export step" + ) + ) self.stdout.write( self.style.SUCCESS("✓ Discord activity tracker completed") @@ -111,24 +175,27 @@ def _task_sync_messages( guild_id: int, full_sync: bool, active_days: int, + since_date, ): """Sync messages from Discord API to database.""" self.stdout.write("Task 1: Syncing Discord messages...") if dry_run: - # Preview what would be synced try: server = DiscordServer.objects.get(server_id=guild_id) channels = DiscordChannel.objects.filter(server=server) if not full_sync: from datetime import timedelta + from django.utils import timezone cutoff = timezone.now() - timedelta(days=active_days) channels = channels.filter(last_activity_at__gte=cutoff) self.stdout.write(f" Would sync {channels.count()} channels") + if since_date: + self.stdout.write(f" Would use sync lower bound: {since_date}") for channel in channels: last_sync = channel.last_synced_at or "never" self.stdout.write( @@ -140,19 +207,17 @@ def _task_sync_messages( return - # Actual sync - logger.info(f"Syncing messages from Discord guild {guild_id}") + logger.info("Syncing messages from Discord guild %s", guild_id) sync_all_channels( token=token, guild_id=guild_id, - since_date=None, + since_date=since_date, full_sync=full_sync, - active_only=not full_sync, # If full sync, include all channels + active_only=not full_sync, active_days=active_days, ) - # Report results server = DiscordServer.objects.get(server_id=guild_id) total_channels = DiscordChannel.objects.filter(server=server).count() total_messages = sum( @@ -189,6 +254,7 @@ def _task_export_markdown( if dry_run: from datetime import timedelta + from django.utils import timezone cutoff = timezone.now() - timedelta(days=active_days) @@ -207,7 +273,7 @@ def _task_export_markdown( return - logger.info(f"Exporting to markdown: {context_repo_path}") + logger.info("Exporting to markdown: %s", context_repo_path) success = export_and_push( context_repo_path=context_repo_path, diff --git a/discord_activity_tracker/management/commands/run_discord_channel_export.py b/discord_activity_tracker/management/commands/run_discord_channel_export.py new file mode 100644 index 0000000..136b784 --- /dev/null +++ b/discord_activity_tracker/management/commands/run_discord_channel_export.py @@ -0,0 +1,107 @@ +"""Run bundled export_guild_by_day.py (per-day DiscordChatExporter) with Django settings.""" + +from __future__ import annotations + +import logging +import os +import subprocess +import sys +from pathlib import Path + +import discord_activity_tracker +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError + +from discord_activity_tracker.sync.dce_cli import ( + DiscordChatExporterCliNotFoundError, + ensure_discord_chat_exporter_cli, +) +from discord_activity_tracker.workspace import get_workspace_root + +logger = logging.getLogger(__name__) + + +def resolve_export_script() -> Path: + """Path to export_guild_by_day.py (bundled or DISCORD_EXPORT_SCRIPT_DIR).""" + custom = (getattr(settings, "DISCORD_EXPORT_SCRIPT_DIR", "") or "").strip() + if custom: + p = Path(custom).expanduser().resolve() / "export_guild_by_day.py" + if p.is_file(): + return p + raise CommandError( + f"DISCORD_EXPORT_SCRIPT_DIR is set but script not found: {p}" + ) + p = ( + Path(discord_activity_tracker.__file__).resolve().parent + / "offline_scripts" + / "export_guild_by_day.py" + ) + if not p.is_file(): + raise CommandError(f"Bundled export script missing: {p}") + return p + + +class Command(BaseCommand): + help = ( + "Run export_guild_by_day.py (DiscordChatExporter per channel/day) using " + "DISCORD_USER_TOKEN, DISCORD_SERVER_ID, and workspace paths. " + "The CLI is resolved via ensure_discord_chat_exporter_cli() (pinned release zip on Windows, " + "or git clone + dotnet publish on macOS/Linux when tools/ is empty). " + "Import JSON into the DB with: python manage.py backfill_discord_json." + ) + + def add_arguments(self, parser): + parser.add_argument( + "--dry-run", + action="store_true", + help="Print resolved script, CLI, and env; do not execute the Python exporter.", + ) + + def handle(self, *args, **options): + dry_run = options["dry_run"] + token = (getattr(settings, "DISCORD_USER_TOKEN", "") or "").strip() + guild_id = (getattr(settings, "DISCORD_SERVER_ID", "") or "").strip() + if not token: + raise CommandError("DISCORD_USER_TOKEN is not set") + if not guild_id: + raise CommandError("DISCORD_SERVER_ID is not set") + + script_path = resolve_export_script() + workspace_root = get_workspace_root() + try: + cli_path = ensure_discord_chat_exporter_cli() + except DiscordChatExporterCliNotFoundError as e: + raise CommandError(str(e)) from e + + env = os.environ.copy() + env["TOKEN"] = token + env["GUILD_ID"] = str(guild_id) + env["EXPORT_ROOT"] = str(workspace_root) + env["CLI"] = str(cli_path) + env["CHANNEL_TO_EXPORT"] = getattr( + settings, "DISCORD_CHANNEL_EXPORT_NAMES", "Discussion - c-cpp-discussion" + ) + env["TIMEZONE"] = getattr( + settings, "DISCORD_EXPORT_TIMEZONE", "America/New_York" + ) + env["EXPORT_CHUNK_DAYS"] = str( + int(getattr(settings, "DISCORD_EXPORT_CHUNK_DAYS", 1) or 1) + ) + + self.stdout.write(f"Script: {script_path}") + self.stdout.write(f"CLI: {cli_path}") + self.stdout.write(f"EXPORT_ROOT: {workspace_root}") + + if dry_run: + self.stdout.write(self.style.WARNING("DRY RUN — not invoking exporter")) + return + + cwd = script_path.parent + cmd = [sys.executable, str(script_path)] + logger.info("Running %s with cwd=%s", cmd, cwd) + proc = subprocess.run(cmd, cwd=cwd, env=env, check=False) + if proc.returncode != 0: + raise CommandError( + f"export_guild_by_day.py exited with code {proc.returncode}" + ) + self.stdout.write(self.style.SUCCESS("export_guild_by_day.py finished")) diff --git a/discord_activity_tracker/management/commands/run_discord_exporter.py b/discord_activity_tracker/management/commands/run_discord_exporter.py index 9b019f2..4d457bc 100644 --- a/discord_activity_tracker/management/commands/run_discord_exporter.py +++ b/discord_activity_tracker/management/commands/run_discord_exporter.py @@ -1,76 +1,162 @@ """Django management command - sync using DiscordChatExporter CLI with user token.""" +import asyncio import logging +from datetime import datetime, timedelta from pathlib import Path -from datetime import timedelta +from typing import Optional -from django.core.management.base import BaseCommand from django.conf import settings +from django.core.management.base import BaseCommand, CommandError from django.utils import timezone as django_timezone -from asgiref.sync import sync_to_async -from discord_activity_tracker.models import DiscordServer, DiscordChannel +from core.utils.datetime_parsing import parse_iso_datetime +from discord_activity_tracker.github_publish import push_discord_markdown_to_github +from discord_activity_tracker.models import DiscordChannel, DiscordServer from discord_activity_tracker.sync.chat_exporter import ( export_guild_to_json, parse_exported_json, - convert_exporter_message_to_dict, ) -from discord_activity_tracker.sync.messages import ( - _process_messages_in_batches, -) -from discord_activity_tracker.sync.utils import parse_datetime from discord_activity_tracker.sync.export import export_and_push -from discord_activity_tracker.services import ( - get_or_create_discord_server, - get_or_create_discord_channel, - update_channel_last_synced, - update_channel_last_activity, -) +from discord_activity_tracker.sync.importer import persist_exporter_channel_payloads from discord_activity_tracker.workspace import get_raw_dir logger = logging.getLogger(__name__) +PINECONE_NAMESPACE_ENV_KEY = "DISCORD_PINECONE_NAMESPACE" + + +def _aware_utc(dt: Optional[datetime]) -> Optional[datetime]: + if dt is None: + return None + if django_timezone.is_naive(dt): + return django_timezone.make_aware(dt, django_timezone.utc) + return dt + + +def _run_discord_pinecone_sync(*, dry_run: bool) -> None: + """Upsert Discord messages to Pinecone via run_cppa_pinecone_sync.""" + from django.core.management import call_command + + app_type = (getattr(settings, "DISCORD_PINECONE_APP_TYPE", "") or "").strip() + namespace = (getattr(settings, "DISCORD_PINECONE_NAMESPACE", "") or "").strip() + if not app_type: + logger.warning( + "Pinecone sync skipped: DISCORD_PINECONE_APP_TYPE is empty (settings/env)." + ) + return + if not namespace: + logger.warning( + "Pinecone sync skipped: namespace is empty (set %s or Django setting).", + PINECONE_NAMESPACE_ENV_KEY, + ) + return + if dry_run: + logger.info("dry-run would run Pinecone sync for Discord messages") + return + try: + call_command( + "run_cppa_pinecone_sync", + app_type=app_type, + namespace=namespace, + preprocessor=( + "discord_activity_tracker.preprocessors.discord_preprocessor." + "preprocess_discord_for_pinecone" + ), + ) + logger.info( + "Pinecone sync completed (app_type=%s, namespace=%s)", + app_type, + namespace, + ) + except Exception as exc: + logger.warning( + "Pinecone sync skipped/failed (run_cppa_pinecone_sync unavailable or errored): %s", + exc, + ) + class Command(BaseCommand): help = ( - "Run Discord Activity Tracker using DiscordChatExporter CLI (user token method)" + "Run Discord Activity Tracker using DiscordChatExporter CLI (user token method). " + "After markdown export, optionally push to DISCORD_MARKDOWN_REPO_* and run Pinecone sync." ) def add_arguments(self, parser): parser.add_argument( "--dry-run", action="store_true", - help="Preview actions without writing to database", + help="Preview actions without writing to the database (no markdown writes in export steps).", ) parser.add_argument( "--task", type=str, default="all", choices=["sync", "export", "all", "import-only"], - help="Task to run: sync, export, all, or import-only (default: all)", + help="Task to run: sync, export, all, or import-only (default: all).", ) parser.add_argument( "--full-sync", action="store_true", - help="Sync all messages (ignore last_synced_at)", + help="Sync all messages (ignore last_synced_at).", ) parser.add_argument( "--months", type=int, default=12, - help="Number of months to export to markdown (default: 12)", + help="Number of months to export to markdown (default: 12).", ) parser.add_argument( "--active-days", type=int, default=30, - help="Number of days to consider a channel active (default: 30)", + help="Number of days to consider a channel active (default: 30).", ) parser.add_argument( "--days-back", type=int, default=30, - help="Number of days back to sync messages (default: 30, 0 for all history)", + help="Number of days back to sync messages (default: 30, 0 for all history).", + ) + parser.add_argument( + "--skip-github-push", + action="store_true", + help="Skip uploading markdown to DISCORD_MARKDOWN_REPO_* after export.", + ) + parser.add_argument( + "--skip-pinecone", + action="store_true", + help="Skip run_cppa_pinecone_sync for Discord messages.", + ) + parser.add_argument( + "--skip-sync", + action="store_true", + help="Skip exportguild → DB sync when --task is all or sync.", + ) + parser.add_argument( + "--skip-markdown-export", + action="store_true", + help="Skip writing markdown to DISCORD_CONTEXT_REPO_PATH (export / all / import-only).", + ) + parser.add_argument( + "--since", + "--from-date", + "--start-time", + type=str, + default=None, + dest="since", + help="Sync window start (YYYY-MM-DD or ISO-8601); passed to exportguild --after. " + "--from-date and --start-time are deprecated aliases for --since.", + ) + parser.add_argument( + "--until", + "--to-date", + "--end-time", + type=str, + default=None, + dest="until", + help="Sync window end (same formats as --since); passed to exportguild --before. " + "--to-date and --end-time are deprecated aliases for --until.", ) def handle(self, *args, **options): @@ -80,6 +166,26 @@ def handle(self, *args, **options): months = options["months"] active_days = options["active_days"] days_back = options["days_back"] + skip_github_push = options["skip_github_push"] + skip_pinecone = options["skip_pinecone"] + skip_sync = options["skip_sync"] + skip_markdown_export = options["skip_markdown_export"] + + try: + since_dt = parse_iso_datetime(options.get("since")) + until_dt = parse_iso_datetime(options.get("until")) + except ValueError as e: + raise CommandError(str(e)) from e + if since_dt and until_dt and since_dt > until_dt: + logger.warning( + "Invalid date range: since (%s) after until (%s); ignoring both", + since_dt.isoformat(), + until_dt.isoformat(), + ) + since_dt = until_dt = None + + since_aware = _aware_utc(since_dt) + until_aware = _aware_utc(until_dt) try: user_token = getattr(settings, "DISCORD_USER_TOKEN", None) @@ -97,35 +203,84 @@ def handle(self, *args, **options): self.stdout.write(self.style.ERROR("DISCORD_SERVER_ID not configured")) return - if not context_repo_path: + markdown_wanted = (task in ("export", "all")) or ( + task == "import-only" and not skip_markdown_export + ) + if markdown_wanted and not context_repo_path: self.stdout.write( self.style.ERROR("DISCORD_CONTEXT_REPO_PATH not configured") ) return - context_repo_path = Path(context_repo_path) + context_repo_path = ( + Path(context_repo_path).resolve() if context_repo_path else None + ) guild_id = int(guild_id) - if task in ["sync", "all"]: + run_sync = task in ("sync", "all") and not skip_sync + if skip_sync and task in ("sync", "all"): + self.stdout.write( + self.style.WARNING("--skip-sync: skipping exportguild → DB sync") + ) + + if run_sync: self._sync_messages( dry_run=dry_run, user_token=user_token, guild_id=guild_id, full_sync=full_sync, days_back=days_back, + since_override=since_aware, + until_cutoff=until_aware, ) if task == "import-only": - self._import_json_files(dry_run=dry_run, guild_id=guild_id) + self._import_json_files( + dry_run=dry_run, + guild_id=guild_id, + since_override=since_aware, + until_cutoff=until_aware, + ) - if task in ["export", "all", "import-only"]: - self._export_markdown( + export_ok = False + if markdown_wanted: + export_ok = self._export_markdown( dry_run=dry_run, guild_id=guild_id, context_repo_path=context_repo_path, months=months, active_days=active_days, ) + elif task == "import-only" and skip_markdown_export: + self.stdout.write( + self.style.WARNING( + "--skip-markdown-export: skipping markdown export after import-only" + ) + ) + + if markdown_wanted and context_repo_path and not skip_github_push: + self.stdout.write("\n=== Uploading markdown to GitHub ===") + if dry_run: + self.stdout.write( + self.style.WARNING( + "DRY RUN — would upload markdown folder to " + "DISCORD_MARKDOWN_REPO_*" + ) + ) + elif export_ok: + push_discord_markdown_to_github(context_repo_path) + else: + self.stdout.write( + self.style.WARNING( + "Skipping GitHub upload (no markdown files exported this run)" + ) + ) + + if not skip_pinecone: + self.stdout.write("\n=== Pinecone sync (Discord) ===") + _run_discord_pinecone_sync(dry_run=dry_run) + else: + logger.info("skipping Pinecone (--skip-pinecone)") self.stdout.write(self.style.SUCCESS("✓ Discord exporter completed")) @@ -141,6 +296,8 @@ def _sync_messages( guild_id: int, full_sync: bool, days_back: int, + since_override: Optional[datetime], + until_cutoff: Optional[datetime], ): """Export messages via CLI and persist to database.""" self.stdout.write("\n=== Syncing Messages using DiscordChatExporter ===") @@ -200,11 +357,19 @@ def _sync_messages( else: self.stdout.write("First sync - fetching all messages") + if since_override is not None: + if after_date is None: + after_date = since_override + else: + after_date = max(after_date, since_override) + self.stdout.write(f" --since lower bound: {after_date}") + json_files = export_guild_to_json( user_token=user_token, guild_id=guild_id, output_dir=temp_dir, after_date=after_date, + before_date=until_cutoff, ) self.stdout.write(f"Exported {len(json_files)} channel files") @@ -217,9 +382,6 @@ def _sync_messages( self.stdout.write(f" #{ch.get('name', '?')}: {msg_count} messages") return - import asyncio - - # Process one file at a time to avoid loading 900MB+ into memory for i, json_path in enumerate(json_files, 1): try: data = parse_exported_json(json_path) @@ -233,7 +395,12 @@ def _sync_messages( self.stdout.write( f" [{i}/{len(json_files)}] #{ch_name}: {msg_count} messages" ) - asyncio.run(self._persist_exported_data(guild_id, [channel_data])) + asyncio.run( + persist_exporter_channel_payloads( + [channel_data], + expected_guild_id=guild_id, + ) + ) json_path.unlink() except Exception as e: logger.error(f"Failed to process {json_path.name}: {e}") @@ -245,57 +412,14 @@ def _sync_messages( logger.exception(f"Sync failed: {e}") raise - async def _persist_exported_data(self, guild_id: int, parsed_data: list): - """Write parsed channel data to the database using bulk operations.""" - for channel_data in parsed_data: - try: - guild_info = channel_data["guild"] - channel_info = channel_data["channel"] - messages = channel_data["messages"] - - server, _ = await sync_to_async(get_or_create_discord_server)( - server_id=guild_info["id"], - server_name=guild_info["name"], - icon_url="", - ) - - channel, _ = await sync_to_async(get_or_create_discord_channel)( - server=server, - channel_id=channel_info["id"], - channel_name=channel_info["name"], - channel_type=channel_info.get("type", "text"), - topic=channel_info.get("topic") or "", - position=0, - ) - - # Convert exporter format to internal format for bulk processing - converted = [convert_exporter_message_to_dict(msg) for msg in messages] - - processed = await _process_messages_in_batches(channel, converted) - - if messages: - last_msg = convert_exporter_message_to_dict(messages[-1]) - last_time = parse_datetime(last_msg.get("created_at")) - if last_time: - await sync_to_async(update_channel_last_activity)( - channel, last_time - ) - - await sync_to_async(update_channel_last_synced)(channel) - - logger.info( - f"Synced #{channel.channel_name}: " - f"{processed}/{len(messages)} messages" - ) - - except Exception as e: - logger.error( - f"Failed to persist channel {channel_info.get('name')}: {e}" - ) - continue - - def _import_json_files(self, dry_run: bool, guild_id: int): - """Import pre-exported JSON files from workspace/discord_activity_tracker/raw/ into the database.""" + def _import_json_files( + self, + dry_run: bool, + guild_id: int, + since_override: Optional[datetime], + until_cutoff: Optional[datetime], + ): + """Import pre-exported JSON files from raw/ into the database.""" self.stdout.write("\n=== Importing JSON Files ===") temp_dir = get_raw_dir() @@ -317,6 +441,24 @@ def _import_json_files(self, dry_run: bool, guild_id: int): for json_path in json_files: try: data = parse_exported_json(json_path) + if since_override is not None or until_cutoff is not None: + msgs = data.get("messages") or [] + if msgs: + from discord_activity_tracker.sync.utils import parse_datetime + + times = [] + for m in msgs: + t = parse_datetime( + m.get("timestamp") or m.get("created_at") + ) + if t: + times.append(t) + if times: + lo, hi = min(times), max(times) + if since_override is not None and hi < since_override: + continue + if until_cutoff is not None and lo > until_cutoff: + continue parsed_data.append( { "guild": data.get("guild", {}), @@ -336,9 +478,12 @@ def _import_json_files(self, dry_run: bool, guild_id: int): self.stdout.write(f"Importing {len(parsed_data)} channels...") - import asyncio - - asyncio.run(self._persist_exported_data(guild_id, parsed_data)) + asyncio.run( + persist_exporter_channel_payloads( + parsed_data, + expected_guild_id=guild_id, + ) + ) self.stdout.write(self.style.SUCCESS(f"✓ Imported {len(parsed_data)} channels")) @@ -349,13 +494,13 @@ def _export_markdown( context_repo_path: Path, months: int, active_days: int, - ): - """Export to markdown files.""" + ) -> bool: + """Export to markdown files. Returns True if at least one file was written.""" self.stdout.write("\n=== Exporting to Markdown ===") if dry_run: self.stdout.write(self.style.WARNING("DRY RUN - no file writes")) - return + return False try: server = DiscordServer.objects.get(server_id=guild_id) @@ -365,7 +510,7 @@ def _export_markdown( f"Server {guild_id} not found in database. Run sync first." ) ) - return + return False success = export_and_push( context_repo_path=context_repo_path, @@ -379,3 +524,4 @@ def _export_markdown( self.stdout.write(self.style.SUCCESS(f"✓ Exported to {context_repo_path}")) else: self.stdout.write(self.style.WARNING("No files exported")) + return bool(success) diff --git a/discord_activity_tracker/offline_scripts/export_guild_by_day.py b/discord_activity_tracker/offline_scripts/export_guild_by_day.py new file mode 100644 index 0000000..a42037c --- /dev/null +++ b/discord_activity_tracker/offline_scripts/export_guild_by_day.py @@ -0,0 +1,708 @@ +#!/usr/bin/env python3 +""" +Export a Discord guild by day or by N-day chunks. +Uses DiscordChatExporter.Cli: lists channels, then for each channel exports +with --after / --before (local timezone, converted to UTC). +- Default: one JSON per channel per day. Output: EXPORT_ROOT/OUTPUT_BASE/CHANNEL/YYYY/YYYY-MM/YYYY-MM-DD.json +- EXPORT_CHUNK_DAYS=10: one JSON per 10 days. Output: EXPORT_ROOT/OUTPUT_BASE/CHANNEL/YYYY-MM-DD_to_YYYY-MM-DD.json +- Multiple tokens: TOKENS=token1,token2,token3 (round-robin per channel). +- Resume: skips channel-days/chunks that already exist under INPUT_BASE or OUTPUT_BASE. +- REFRESH_LAST_DATE=1: re-export only the last day/chunk per channel. +- FETCH_FROM_LAST=1: per channel, find last date in INPUT_BASE, then fetch from that date (incl.) to today. +- EXPORT_ROOT: defaults to /cpp_discord_output (sibling of script/; same folder as run_export.py). INPUT_BASE / OUTPUT_BASE are subfolder names inside it (default read / output), or absolute paths. Relative EXPORT_ROOT is resolved under project root (not cwd). Existing JSON is detected under read/, output/, and directly under EXPORT_ROOT (legacy layout). +Runs up to PARALLEL channels at a time; within each channel, exports are sequential. +""" + +import os +import re +import subprocess +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional, Tuple + +try: + from zoneinfo import ZoneInfo +except ImportError: + ZoneInfo = None + try: + import pytz + except ImportError: + pytz = None + +# ---------- Config (override with env vars) ---------- +# Single token (legacy) or set TOKENS in env as comma-separated: TOKENS=token1,token2,token3 +# Script default: one string or a list of token strings. +_TOKEN_DEFAULT = [ + "", +] +if os.environ.get("TOKENS", "").strip(): + TOKENS = [t.strip() for t in os.environ["TOKENS"].split(",") if t.strip()] +else: + single = os.environ.get("TOKEN", "").strip() + if single: + TOKENS = [single] + elif isinstance(_TOKEN_DEFAULT, list): + TOKENS = [t for t in _TOKEN_DEFAULT if isinstance(t, str) and t.strip()] + else: + TOKENS = [str(_TOKEN_DEFAULT).strip()] if str(_TOKEN_DEFAULT).strip() else [] +GUILD_ID = os.environ.get("GUILD_ID", "331718482485837825") +START_DATE = os.environ.get("START_DATE", "2017-06-01") +END_DATE = os.environ.get("END_DATE", "").strip() # default: today +PARALLEL = int(os.environ.get("PARALLEL", "1")) +CLI = os.environ.get("CLI", "DiscordChatExporter.Cli.exe") +TIMEZONE = os.environ.get("TIMEZONE", "America/New_York") +# If True (or env FAIL_ON_CHANNEL_ERROR=1), exit with code 1 when any channel fails (e.g. forbidden). Default: False. +FAIL_ON_CHANNEL_ERROR = os.environ.get("FAIL_ON_CHANNEL_ERROR", "0").strip() == "1" +# If True (env REFRESH_LAST_DATE=1), re-export only the last date per channel (overwrite that day's file). +REFRESH_LAST_DATE = os.environ.get("REFRESH_LAST_DATE", "0").strip() == "1" +# If True (env FETCH_FROM_LAST=1), for each channel: find last date in INPUT_BASE, then fetch from that date (incl.) to today. Default: 0. +FETCH_FROM_LAST = os.environ.get("FETCH_FROM_LAST", "1").strip() == "1" +# Export in N-day chunks; file name is FROM_to_TO.json (e.g. 2017-06-01_to_2017-06-10). Use 1 for per-day (legacy). +EXPORT_CHUNK_DAYS = int(os.environ.get("EXPORT_CHUNK_DAYS", "1")) +# ---------------------------------------------------- + +SCRIPT_DIR = Path(__file__).resolve().parent +PROJECT_ROOT = SCRIPT_DIR.parent +# Data root next to script/: /cpp_discord_output/ (sibling of script/) +_export_root_env = os.environ.get("EXPORT_ROOT", "").strip() +if _export_root_env: + _er = Path(_export_root_env).expanduser() + EXPORT_ROOT = _er.resolve() if _er.is_absolute() else (PROJECT_ROOT / _er).resolve() +else: + EXPORT_ROOT = (PROJECT_ROOT / "cpp_discord_output").resolve() +# Subfolders inside EXPORT_ROOT (defaults: read = resume source, output = new JSON) +_DEFAULT_INPUT_SUBDIR = "read" +_DEFAULT_OUTPUT_SUBDIR = "output" +OUTPUT_BASE = os.environ.get("OUTPUT_BASE", _DEFAULT_OUTPUT_SUBDIR) +INPUT_BASE = os.environ.get("INPUT_BASE", _DEFAULT_INPUT_SUBDIR) + +_CHANNEL_DEFAULT = [ + "Discussion - c-cpp-discussion", +] +if os.environ.get("CHANNEL_TO_EXPORT", "").strip(): + CHANNEL_TO_EXPORT = [ + x.strip() for x in os.environ["CHANNEL_TO_EXPORT"].split(",") if x.strip() + ] +else: + CHANNEL_TO_EXPORT = _CHANNEL_DEFAULT + + +def resolve_cli(): + """CLI path: if relative, resolve against SCRIPT_DIR (for Windows).""" + p = Path(CLI) + if not p.is_absolute(): + p = SCRIPT_DIR / p + return str(p) + + +def _export_data_dir(base_key: str) -> Path: + """Read/write root: absolute path if base_key is absolute, else EXPORT_ROOT / base_key.""" + p = Path(base_key) + if p.is_absolute(): + return p.resolve() + return (EXPORT_ROOT / p).resolve() + + +# Characters to sanitize for path: \ / : * ? " < > | +SANITIZE_RE = re.compile(r'[\\/:*?"<>|]') + + +def strip_ansi(text: str) -> str: + """Remove ANSI escape sequences from CLI output.""" + return re.sub(r"\x1b\[[0-9;]*m", "", text) + + +def sanitize_name(name: str) -> str: + """Sanitize channel name for use in filesystem paths.""" + return SANITIZE_RE.sub("-", name).strip() + + +def channel_name_for_path(full: str) -> str: + """Strip guild (first segment): 'Guild - Category - Channel' -> 'Category - Channel'.""" + if " - " in full: + return full.split(" - ", 1)[1].strip() + return full.strip() + + +def get_timezone(): + """Return timezone object for TIMEZONE (zoneinfo or pytz).""" + if ZoneInfo is not None: + return ZoneInfo(TIMEZONE), ZoneInfo("UTC") + if pytz is not None: + return pytz.timezone(TIMEZONE), pytz.UTC + raise RuntimeError("Need zoneinfo (Python 3.9+) or pytz for timezone support") + + +def date_to_utc_range(date_str: str): + """Convert calendar day (local TZ) to (after_utc, before_utc) as 'YYYY-MM-DD HH:MM:SS'.""" + tz_local, utc = get_timezone() + start_local = datetime.strptime(date_str, "%Y-%m-%d") + if ZoneInfo is not None: + start_local = start_local.replace(tzinfo=tz_local) + else: + start_local = tz_local.localize(start_local) + end_local = start_local + timedelta(days=1) + after_utc = start_local.astimezone(utc).strftime("%Y-%m-%d %H:%M:%S") + before_utc = end_local.astimezone(utc).strftime("%Y-%m-%d %H:%M:%S") + return after_utc, before_utc + + +def date_range_to_utc_range(start_date_str: str, end_date_str: str): + """Convert date range (inclusive) to (after_utc, before_utc). before_utc is start of day after end_date.""" + tz_local, utc = get_timezone() + start_local = datetime.strptime(start_date_str, "%Y-%m-%d") + end_local = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1) + if ZoneInfo is not None: + start_local = start_local.replace(tzinfo=tz_local) + end_local = end_local.replace(tzinfo=tz_local) + else: + start_local = tz_local.localize(start_local) + end_local = tz_local.localize(end_local) + after_utc = start_local.astimezone(utc).strftime("%Y-%m-%d %H:%M:%S") + before_utc = end_local.astimezone(utc).strftime("%Y-%m-%d %H:%M:%S") + return after_utc, before_utc + + +def get_chunks_from_dates( + dates_list: list[str], chunk_days: int +) -> list[Tuple[str, str]]: + """Split a list of YYYY-MM-DD dates into chunks of up to chunk_days. Returns [(start, end), ...].""" + if chunk_days <= 1 or not dates_list: + return [(d, d) for d in dates_list] + chunks = [] + i = 0 + while i < len(dates_list): + start = dates_list[i] + j = min(i + chunk_days, len(dates_list)) - 1 + end = dates_list[j] + chunks.append((start, end)) + i = j + 1 + return chunks + + +def _unique_export_bases() -> tuple[str, ...]: + """Folders to scan for existing exports (resume / FETCH_FROM_LAST). Dedup if INPUT_BASE == OUTPUT_BASE.""" + if INPUT_BASE == OUTPUT_BASE: + return (INPUT_BASE,) + return (INPUT_BASE, OUTPUT_BASE) + + +def _iter_scan_roots() -> tuple[Path, ...]: + """Roots searched for already-exported JSON (resume, skip-if-exists, last-date). + + Includes INPUT_BASE and OUTPUT_BASE under EXPORT_ROOT, plus EXPORT_ROOT itself + so legacy trees (channel/... directly under cpp_discord_output) are visible. + Deduped by resolved path. + """ + seen: set[Path] = set() + roots: list[Path] = [] + for base in _unique_export_bases(): + p = _export_data_dir(base) + r = p.resolve() + if r not in seen: + seen.add(r) + roots.append(p) + er = EXPORT_ROOT.resolve() + if er not in seen: + roots.append(EXPORT_ROOT) + return tuple(roots) + + +def output_file_exists(sanitized_name: str, date_str: str) -> bool: + """True if the JSON for this channel/day already exists (resume).""" + year = date_str[:4] + month = date_str[5:7] + rel = Path(sanitized_name) / year / f"{year}-{month}" / f"{date_str}.json" + for root in _iter_scan_roots(): + if (root / rel).is_file(): + return True + return False + + +def chunk_file_path( + sanitized_name: str, + start_date: str, + end_date: str, + *, + base: Optional[str] = None, +) -> Path: + """Path for a 10-day chunk file: base/CHANNEL/YYYY-MM-DD_to_YYYY-MM-DD.json. Default base=OUTPUT_BASE.""" + if base is None: + base = OUTPUT_BASE + return _export_data_dir(base) / sanitized_name / f"{start_date}_to_{end_date}.json" + + +def chunk_file_exists(sanitized_name: str, start_date: str, end_date: str) -> bool: + """True if the chunk file exists under any scanned export root.""" + name = f"{start_date}_to_{end_date}.json" + rel = Path(sanitized_name) / name + for root in _iter_scan_roots(): + if (root / rel).is_file(): + return True + return False + + +def get_missing_dates(sanitized_name: str, dates_list: list[str]) -> list[str]: + """Return only dates that are not yet exported (for resume).""" + return [d for d in dates_list if not output_file_exists(sanitized_name, d)] + + +def get_missing_chunks( + sanitized_name: str, chunks: list[Tuple[str, str]] +) -> list[Tuple[str, str]]: + """Return only chunks that are not yet exported (resume for chunk mode).""" + return [(s, e) for s, e in chunks if not chunk_file_exists(sanitized_name, s, e)] + + +def get_last_export_date(sanitized_name: str) -> Optional[str]: + """Return the latest (end) date that has a JSON file in this channel, or None. + Supports both per-day files (YYYY-MM-DD.json) and chunk files (YYYY-MM-DD_to_YYYY-MM-DD.json). + Scans read/, output/, and legacy paths directly under EXPORT_ROOT. + """ + dates = [] + for root in _iter_scan_roots(): + channel_dir = root / sanitized_name + if not channel_dir.is_dir(): + continue + for f in channel_dir.rglob("*.json"): + if not f.is_file(): + continue + stem = f.stem + if len(stem) == 10 and stem[4] == "-" and stem[7] == "-": + dates.append(stem) + elif "_to_" in stem: + parts = stem.split("_to_", 1) + if len(parts) == 2 and len(parts[1]) == 10: + dates.append(parts[1]) + return max(dates) if dates else None + + +def get_last_export_chunk( + sanitized_name: str, +) -> Optional[Tuple[str, str]]: + """Return (start_date, end_date) of the latest chunk file in this channel, or None. + Only used when EXPORT_CHUNK_DAYS > 1 (chunk mode). Scans read/, output/, and EXPORT_ROOT. + """ + best = None + best_end = None + for root in _iter_scan_roots(): + channel_dir = root / sanitized_name + if not channel_dir.is_dir(): + continue + for f in channel_dir.glob("*_to_*.json"): + if not f.is_file(): + continue + stem = f.stem + if "_to_" not in stem: + continue + parts = stem.split("_to_", 1) + if len(parts) != 2 or len(parts[0]) != 10 or len(parts[1]) != 10: + continue + if ( + parts[0][4] == "-" + and parts[0][7] == "-" + and parts[1][4] == "-" + and parts[1][7] == "-" + ): + if best_end is None or parts[1] > best_end: + best = (parts[0], parts[1]) + best_end = parts[1] + return best + + +def fetch_channels(token: str): + """Run CLI channels and parse (channel_id, sanitized_name) list. Skip threads.""" + exe = resolve_cli() + cmd = [exe, "channels", "--token", token, "--guild", GUILD_ID] + result = subprocess.run( + cmd, + cwd=SCRIPT_DIR, + capture_output=True, + text=True, + timeout=60, + ) + raw = (result.stdout or "") + (result.stderr or "") + raw = strip_ansi(raw) + if not raw.strip(): + raise SystemExit( + "Could not get channel list. Check token, guild ID, and network." + ) + + channels = [] + for line in raw.splitlines(): + line = strip_ansi(line).strip() + if not line or " * " in line: # skip thread lines + continue + if " | " not in line: + continue + left, name = line.split(" | ", 1) + cid = left.strip().replace(" ", "") + if not cid.isdigit(): + continue + name = name.split(" Thread ")[0].split(" | ")[0].strip() + path_name = sanitize_name(channel_name_for_path(name)) + if not path_name or path_name == "-": + path_name = cid + if path_name not in CHANNEL_TO_EXPORT: + continue + channels.append((cid, path_name)) + return channels + + +def export_one_chunk( + channel_id: str, + sanitized_name: str, + start_date_str: str, + end_date_str: str, + token: str, + overwrite: bool = False, +) -> bool: + """Export one channel for a date range (e.g. 10 days). File: START_to_END.json. + Returns True on success, False on failure. + """ + out_file = chunk_file_path(sanitized_name, start_date_str, end_date_str) + if out_file.exists() and not overwrite: + print( + f"Skip (exists): {sanitized_name} {start_date_str}_to_{end_date_str}", + file=sys.stderr, + ) + return True + + out_file.parent.mkdir(parents=True, exist_ok=True) + after_utc, before_utc = date_range_to_utc_range(start_date_str, end_date_str) + + exe = resolve_cli() + cmd = [ + exe, + "export", + "--token", + token, + "--channel", + channel_id, + "--output", + str(out_file), + "--format", + "Json", + "--include-threads", + "None", + "--markdown", + "True", + "--respect-rate-limits", + "True", + "--after", + after_utc, + "--before", + before_utc, + ] + MAX_RETRIES = 3 + for i in range(MAX_RETRIES): + try: + proc = subprocess.run( + cmd, + cwd=SCRIPT_DIR, + capture_output=True, + text=True, + timeout=600, + stdin=subprocess.DEVNULL, + ) + if proc.returncode != 0: + print( + f"FAILED: {sanitized_name} {start_date_str}_to_{end_date_str} (channel {channel_id})", + file=sys.stderr, + ) + if proc.stderr: + print(proc.stderr, file=sys.stderr) + continue + print( + f"OK: {sanitized_name} {start_date_str}_to_{end_date_str}", + file=sys.stderr, + ) + return True + except subprocess.TimeoutExpired: + print( + f"TIMEOUT: {sanitized_name} {start_date_str}_to_{end_date_str} (channel {channel_id})", + file=sys.stderr, + ) + if i < MAX_RETRIES - 1: + print(f"Retrying... ({i + 1}/{MAX_RETRIES})", file=sys.stderr) + else: + print( + f"FAILED: {sanitized_name} {start_date_str}_to_{end_date_str} (channel {channel_id})", + file=sys.stderr, + ) + return False + + +def export_one_day( + channel_id: str, + sanitized_name: str, + date_str: str, + token: str, + overwrite: bool = False, +) -> bool: + """Export one channel for one day. Returns True on success, False on failure. + If overwrite=True, re-export even when the file exists (for refreshing last date). + """ + year = date_str[:4] + month = date_str[5:7] + out_dir = _export_data_dir(OUTPUT_BASE) / sanitized_name / year / f"{year}-{month}" + out_file = out_dir / f"{date_str}.json" + + if out_file.exists() and not overwrite: + print(f"Skip (exists): {sanitized_name} {date_str}", file=sys.stderr) + return True + + out_dir.mkdir(parents=True, exist_ok=True) + after_utc, before_utc = date_to_utc_range(date_str) + + exe = resolve_cli() + cmd = [ + exe, + "export", + "--token", + token, + "--channel", + channel_id, + "--output", + str(out_file), + "--format", + "Json", + "--include-threads", + "None", + "--markdown", + "True", + "--respect-rate-limits", + "True", + "--after", + after_utc, + "--before", + before_utc, + ] + MAX_RETRIES = 3 + for i in range(MAX_RETRIES): + try: + proc = subprocess.run( + cmd, + cwd=SCRIPT_DIR, + capture_output=True, + text=True, + timeout=300, + stdin=subprocess.DEVNULL, + ) + if proc.returncode != 0: + print( + f"FAILED: {sanitized_name} {date_str} (channel {channel_id})", + file=sys.stderr, + ) + if proc.stderr: + print(proc.stderr, file=sys.stderr) + continue + print(f"OK: {sanitized_name} {date_str}", file=sys.stderr) + return True + except subprocess.TimeoutExpired: + print( + f"TIMEOUT: {sanitized_name} {date_str} (channel {channel_id})", + file=sys.stderr, + ) + if i < MAX_RETRIES - 1: + print(f"Retrying... ({i + 1}/{MAX_RETRIES})", file=sys.stderr) + else: + print( + f"FAILED: {sanitized_name} {date_str} (channel {channel_id})", + file=sys.stderr, + ) + + return False + + +def export_channel_days(args): + """Export for one channel. args = (channel_id, sanitized_name, dates_or_chunks, token, overwrite=False). + dates_or_chunks: list of date strings (per-day) or list of (start_date, end_date) tuples (chunk mode). + """ + if len(args) == 5: + channel_id, sanitized_name, dates_or_chunks, token, overwrite = args + else: + channel_id, sanitized_name, dates_or_chunks, token = args + overwrite = False + failed = False + use_chunks = dates_or_chunks and isinstance(dates_or_chunks[0], tuple) + if use_chunks: + for start_d, end_d in dates_or_chunks: + if not export_one_chunk( + channel_id, + sanitized_name, + start_d, + end_d, + token, + overwrite=overwrite, + ): + failed = True + break + else: + for date_str in dates_or_chunks: + if not export_one_day( + channel_id, + sanitized_name, + date_str, + token, + overwrite=overwrite, + ): + failed = True + break + return (channel_id, sanitized_name, failed) + + +def main(): + if not TOKENS: + print("ERROR: Set TOKEN or TOKENS (env or edit script).", file=sys.stderr) + sys.exit(1) + + print( + f"EXPORT_ROOT={EXPORT_ROOT}\n" + f"INPUT (read/resume)={_export_data_dir(INPUT_BASE)}\n" + f"OUTPUT (write)={_export_data_dir(OUTPUT_BASE)}", + file=sys.stderr, + ) + + end = END_DATE or datetime.now().strftime("%Y-%m-%d") + start_dt = datetime.strptime(START_DATE, "%Y-%m-%d") + end_dt = datetime.strptime(end, "%Y-%m-%d") + + dates_list = [] + d = start_dt + while d <= end_dt: + dates_list.append(d.strftime("%Y-%m-%d")) + d += timedelta(days=1) + + print("Fetching channel list...", file=sys.stderr) + channels = fetch_channels(TOKENS[0]) + print( + f"Found {len(channels)} channels. Using {len(TOKENS)} token(s).", + file=sys.stderr, + ) + + chunk_mode = EXPORT_CHUNK_DAYS > 1 + if chunk_mode: + chunks_list = get_chunks_from_dates(dates_list, EXPORT_CHUNK_DAYS) + print( + f"Chunk mode: {EXPORT_CHUNK_DAYS} days per file (from_date_to_date.json). {len(chunks_list)} chunk(s).", + file=sys.stderr, + ) + + if REFRESH_LAST_DATE: + # Re-fetch only the last date/chunk per channel (overwrite). + work = [] + for i, (cid, cname) in enumerate(channels): + if chunk_mode: + last_chunk = get_last_export_chunk(cname) + if not last_chunk: + continue + token = TOKENS[i % len(TOKENS)] + work.append((cid, cname, [last_chunk], token, True)) + else: + last_date = get_last_export_date(cname) + if not last_date: + continue + token = TOKENS[i % len(TOKENS)] + work.append((cid, cname, [last_date], token, True)) + print( + f"REFRESH_LAST_DATE: re-exporting last {'chunk' if chunk_mode else 'day'} for {len(work)} channel(s).", + file=sys.stderr, + ) + elif FETCH_FROM_LAST: + # 1) Channel list done. 2) Find last date per channel. 3) Fetch from that date (incl.) to today. + today_str = datetime.now().strftime("%Y-%m-%d") + work = [] + for i, (cid, cname) in enumerate(channels): + last_date = get_last_export_date(cname) + if last_date is None: + last_date = START_DATE + start_dt = datetime.strptime(last_date, "%Y-%m-%d") + end_dt = datetime.strptime(today_str, "%Y-%m-%d") + if start_dt > end_dt: + continue + dates_from_last = [] + d = start_dt + while d <= end_dt: + dates_from_last.append(d.strftime("%Y-%m-%d")) + d += timedelta(days=1) + if not dates_from_last: + continue + token = TOKENS[i % len(TOKENS)] + if chunk_mode: + chunks = get_chunks_from_dates(dates_from_last, EXPORT_CHUNK_DAYS) + work.append((cid, cname, chunks, token, True)) + else: + work.append((cid, cname, dates_from_last, token, True)) + print( + f"FETCH_FROM_LAST: fetching from last date (incl.) to today for {len(work)} channel(s).", + file=sys.stderr, + ) + else: + print( + f"Date range: {START_DATE} to {end} ({len(dates_list)} days). Running up to {PARALLEL} channels in parallel.", + file=sys.stderr, + ) + work = [] + for i, (cid, cname) in enumerate(channels): + if chunk_mode: + missing = get_missing_chunks(cname, chunks_list) + if not missing: + continue + token = TOKENS[i % len(TOKENS)] + work.append((cid, cname, missing, token)) + else: + missing = get_missing_dates(cname, dates_list) + if not missing: + continue + token = TOKENS[i % len(TOKENS)] + work.append((cid, cname, missing, token)) + + if chunk_mode: + total_chunks = len(channels) * len(chunks_list) + else: + total_chunks = len(channels) * len(dates_list) + already_done = total_chunks - sum(len(w[2]) for w in work) + if already_done and not REFRESH_LAST_DATE and not FETCH_FROM_LAST: + unit = "chunk(s)" if chunk_mode else "channel-day(s)" + print( + f"Resume: {already_done} {unit} already exported; {sum(len(w[2]) for w in work)} left.", + file=sys.stderr, + ) + if not work: + print( + "Nothing to do (all channel-days already exported).", + file=sys.stderr, + ) + return + + failed_channels = [] + if PARALLEL <= 1: + for item in work: + cid, cname = item[0], item[1] + print(f"Channel {cid} ({cname})...", file=sys.stderr) + _, __, failed = export_channel_days(item) + if failed: + failed_channels.append(cid) + else: + with ThreadPoolExecutor(max_workers=PARALLEL) as executor: + futures = {executor.submit(export_channel_days, a): a for a in work} + for future in as_completed(futures): + cid, cname, failed = future.result() + if failed: + failed_channels.append(cid) + + if failed_channels: + print( + f"Some channel(s) could not be exported (e.g. forbidden = no read permission). Channel IDs: {failed_channels}", + file=sys.stderr, + ) + if FAIL_ON_CHANNEL_ERROR: + sys.exit(1) + print("Done.", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/discord_activity_tracker/preprocessors/__init__.py b/discord_activity_tracker/preprocessors/__init__.py new file mode 100644 index 0000000..84f7ad1 --- /dev/null +++ b/discord_activity_tracker/preprocessors/__init__.py @@ -0,0 +1 @@ +"""Pinecone preprocessors for discord_activity_tracker.""" diff --git a/discord_activity_tracker/preprocessors/discord_preprocessor.py b/discord_activity_tracker/preprocessors/discord_preprocessor.py new file mode 100644 index 0000000..5801fb7 --- /dev/null +++ b/discord_activity_tracker/preprocessors/discord_preprocessor.py @@ -0,0 +1,132 @@ +""" +Pinecone preprocess for Discord messages. + +See docs/Pinecone_preprocess_guideline.md +""" + +from __future__ import annotations + +import logging +import re +from datetime import datetime +from typing import Any + +from django.conf import settings +from discord_activity_tracker.models import DiscordMessage + +logger = logging.getLogger(__name__) + +# Defaults when settings omit explicit app type / namespace +APP_TYPE = "discord-together-cpp" +NAMESPACE = "discord-cplusplus" + + +def _normalize_failed_ids(failed_ids: list[str]) -> list[str]: + seen: set[str] = set() + out: list[str] = [] + for raw in failed_ids or []: + value = (raw or "").strip() + if not value or value in seen: + continue + seen.add(value) + out.append(value) + return out + + +def _clean_discord_text(text: str) -> str: + if not text: + return "" + text = re.sub(r"<@!?(\d+)>", r"@user-\1", text) + text = re.sub(r"<#(\d+)>", r"#channel-\1", text) + text = re.sub(r"]+):\d+>", r":\1:", text) + text = re.sub(r"\s+", " ", text).strip() + return text + + +def _message_text(msg: DiscordMessage) -> str: + parts = [] + if msg.author: + who = msg.author.display_name or msg.author.username or "unknown" + parts.append(f"@{who}") + parts.append(_clean_discord_text(msg.content or "")) + if msg.attachment_urls: + parts.append( + "Attachments: " + ", ".join(str(u) for u in msg.attachment_urls[:5]) + ) + return "\n".join(p for p in parts if p) + + +def preprocess_discord_for_pinecone( + failed_ids: list[str], + final_sync_at: datetime | None, +) -> tuple[list[dict[str, Any]], bool]: + """ + Build Pinecone documents from DiscordMessage rows. + + Incremental sync uses ``updated_at`` vs ``final_sync_at``. Retries use PK in + ``failed_ids``. + """ + normalized_failed = _normalize_failed_ids(failed_ids) + min_len = int(getattr(settings, "PINECONE_MIN_TEXT_LENGTH", 50) or 50) + + qs = DiscordMessage.objects.filter(is_deleted=False).select_related( + "author", "channel", "channel__server" + ) + + messages_new: list[DiscordMessage] = [] + messages_failed: list[DiscordMessage] = [] + + if final_sync_at is None and not normalized_failed: + messages_new = list(qs.order_by("message_created_at")) + logger.info( + "Discord Pinecone preprocess: first sync, %d messages", + len(messages_new), + ) + else: + if final_sync_at is not None: + messages_new = list( + qs.filter(updated_at__gt=final_sync_at).order_by("message_created_at") + ) + logger.info( + "Discord Pinecone preprocess: incremental, %d messages", + len(messages_new), + ) + if normalized_failed: + pks = [] + for x in normalized_failed: + try: + pks.append(int(x)) + except ValueError: + continue + if pks: + messages_failed = list(qs.filter(pk__in=pks)) + logger.info( + "Discord Pinecone preprocess: retry failed, %d messages", + len(messages_failed), + ) + + seen_pk: set[int] = set() + docs: list[dict[str, Any]] = [] + for msg in messages_new + messages_failed: + if msg.pk in seen_pk: + continue + seen_pk.add(msg.pk) + content = _message_text(msg) + if len(content.strip()) < min_len: + continue + ch = msg.channel + server = ch.server if ch else None + doc_id = f"discord-msg-{msg.message_id}" + metadata: dict[str, Any] = { + "doc_id": doc_id, + "source_ids": str(msg.pk), + "type": "discord", + "channel_name": ch.channel_name if ch else "", + "channel_id": str(ch.channel_id) if ch else "", + "server_name": server.server_name if server else "", + "message_id": str(msg.message_id), + } + docs.append({"content": content, "metadata": metadata}) + + logger.info("Discord Pinecone preprocess: built %d documents", len(docs)) + return docs, False diff --git a/discord_activity_tracker/sync/backfill_paths.py b/discord_activity_tracker/sync/backfill_paths.py new file mode 100644 index 0000000..61efb0d --- /dev/null +++ b/discord_activity_tracker/sync/backfill_paths.py @@ -0,0 +1,103 @@ +"""Path ordering and date-window filtering for Discussion-style DiscordChatExporter JSON trees.""" + +from __future__ import annotations + +from datetime import date, datetime +from pathlib import Path +from typing import Iterator, Optional + + +def is_resource_fork_json(path: Path) -> bool: + return path.name.startswith("._") or any(p.startswith("._") for p in path.parts) + + +def discussion_json_sort_key(path: Path) -> tuple: + """Sort per-day files by calendar day, then chunk files by end date then start date.""" + if is_resource_fork_json(path): + return ("z", "", "", str(path)) + stem = path.stem + if "_to_" in stem: + a, b = stem.split("_to_", 1) + if ( + len(a) == 10 + and len(b) == 10 + and a[4] == "-" + and a[7] == "-" + and b[4] == "-" + and b[7] == "-" + ): + return ("1", b, a, str(path)) + if len(stem) == 10 and stem[4] == "-" and stem[7] == "-": + return ("0", stem, "", str(path)) + return ("2", stem, "", str(path)) + + +def iter_discussion_json_files(root: Path) -> Iterator[Path]: + if not root.is_dir(): + return + for p in sorted(root.rglob("*.json"), key=discussion_json_sort_key): + if is_resource_fork_json(p): + continue + yield p + + +def _as_date(d: Optional[datetime]) -> Optional[date]: + if d is None: + return None + return d.date() if isinstance(d, datetime) else None + + +def _day_in_window( + since: Optional[datetime], + until: Optional[datetime], + day: date, +) -> bool: + sd = _as_date(since) + ud = _as_date(until) + if sd and day < sd: + return False + if ud and day > ud: + return False + return True + + +def _range_overlaps( + since: Optional[datetime], + until: Optional[datetime], + start_day: date, + end_day: date, +) -> bool: + sd = _as_date(since) + ud = _as_date(until) + if sd and end_day < sd: + return False + if ud and start_day > ud: + return False + return True + + +def json_path_in_date_window( + path: Path, + since: Optional[datetime], + until: Optional[datetime], +) -> bool: + """Include file when its filename encodes a day or chunk range overlapping ``since``/``until``.""" + if since is None and until is None: + return True + stem = path.stem + if "_to_" in stem: + parts = stem.split("_to_", 1) + if len(parts) == 2 and len(parts[0]) == 10 and len(parts[1]) == 10: + try: + d0 = date.fromisoformat(parts[0]) + d1 = date.fromisoformat(parts[1]) + except ValueError: + return True + return _range_overlaps(since, until, d0, d1) + if len(stem) == 10: + try: + d = date.fromisoformat(stem) + except ValueError: + return True + return _day_in_window(since, until, d) + return True diff --git a/discord_activity_tracker/sync/chat_exporter.py b/discord_activity_tracker/sync/chat_exporter.py index d19d5bd..b854dbc 100644 --- a/discord_activity_tracker/sync/chat_exporter.py +++ b/discord_activity_tracker/sync/chat_exporter.py @@ -8,14 +8,17 @@ from pathlib import Path from typing import Optional, List, Dict, Any -from ..workspace import get_workspace_root +from .dce_cli import ensure_discord_chat_exporter_cli logger = logging.getLogger(__name__) -def _get_cli_path() -> Path: - """Resolve CLI path at call time (workspace may not exist at import time).""" - return get_workspace_root() / "tools" / "DiscordChatExporter.Cli.exe" +def _directory_output_arg(output_dir: Path) -> str: + """DiscordChatExporter expects a trailing path separator for directory output.""" + out = str(output_dir.resolve()) + if not out.endswith(("/", "\\")): + out += os.sep + return out class DiscordChatExporterError(Exception): @@ -31,12 +34,12 @@ def export_guild_to_json( include_threads: str = "None", ) -> List[Path]: """Export all channels from a guild. Returns list of JSON file paths.""" - cli_path = _get_cli_path() - if not cli_path.exists(): + try: + cli_path = ensure_discord_chat_exporter_cli() + except Exception as e: raise DiscordChatExporterError( - f"DiscordChatExporter CLI not found at {cli_path}. " - "Download it from GitHub and place in workspace/discord_activity_tracker/tools/." - ) + f"DiscordChatExporter CLI not available: {e}" + ) from e output_dir.mkdir(parents=True, exist_ok=True) @@ -48,7 +51,7 @@ def export_guild_to_json( "--guild", str(guild_id), "--output", - str(output_dir) + "\\", # trailing slash = directory output + _directory_output_arg(output_dir), "--format", "Json", "--include-threads", @@ -133,7 +136,7 @@ def parse_exported_json(json_path: Path) -> Dict[str, Any]: logger.debug(f"Parsing {json_path.name}") try: - with open(json_path, "r", encoding="utf-8") as f: + with open(json_path, "r", encoding="utf-8", errors="replace") as f: data = json.load(f) return data diff --git a/discord_activity_tracker/sync/dce_cli.py b/discord_activity_tracker/sync/dce_cli.py new file mode 100644 index 0000000..ab2fc1f --- /dev/null +++ b/discord_activity_tracker/sync/dce_cli.py @@ -0,0 +1,340 @@ +"""Ensure DiscordChatExporter CLI exists: local tools/, legacy script/, zip (Windows), or git + dotnet build.""" + +from __future__ import annotations + +import json +import logging +import os +import platform +import shutil +import subprocess +import sys +import tempfile +import urllib.error +import urllib.request +import zipfile +from pathlib import Path + +from ..workspace import get_script_dir, get_tools_dir, get_workspace_root + +logger = logging.getLogger(__name__) + +DCE_GITHUB_REPO = "Tyrrrz/DiscordChatExporter" +DCE_GIT_URL = f"https://github.com/{DCE_GITHUB_REPO}.git" +# Pinned release: https://github.com/Tyrrrz/DiscordChatExporter/releases — default 2.47 +DEFAULT_PINNED_VERSION = "2.47" + +USER_AGENT = "boost-data-collector/dce_cli (DiscordChatExporter bootstrap)" + +# Full RID zip (e.g. 2.47) extracts here so the .exe sits next to its DLLs +DCE_CLI_BUNDLE_SUBDIR = "DiscordChatExporter-cli" + + +def _pinned_release_tag() -> str: + """Git tag for releases API and git clone (e.g. ``2.47``).""" + env_val = (os.environ.get("DISCORD_CHAT_EXPORTER_VERSION") or "").strip() + if env_val: + return env_val + try: + from django.conf import settings + + if getattr(settings, "configured", False): + v = (getattr(settings, "DISCORD_CHAT_EXPORTER_VERSION", "") or "").strip() + if v: + return v + except Exception: + pass + return DEFAULT_PINNED_VERSION + + +def _release_api_url(tag: str) -> str: + return f"https://api.github.com/repos/{DCE_GITHUB_REPO}/releases/tags/{tag}" + + +class DiscordChatExporterCliNotFoundError(RuntimeError): + """Raised when the CLI cannot be downloaded or built.""" + + +def _win32() -> bool: + return sys.platform == "win32" + + +def _cli_exe_name() -> str: + return "DiscordChatExporter.Cli.exe" if _win32() else "DiscordChatExporter.Cli" + + +def _find_executable_in_dir(directory: Path, name: str) -> Path | None: + """Return path if file exists and is a file.""" + p = directory / name + if p.is_file(): + return p + return None + + +def _find_cli_anywhere(tools: Path, script: Path) -> Path | None: + name = _cli_exe_name() + for base in (tools, script): + if not base.exists(): + continue + found = _find_executable_in_dir(base, name) + if found: + return found + bundle = tools / DCE_CLI_BUNDLE_SUBDIR / name + if bundle.is_file(): + return bundle + return None + + +def _dotnet_rid() -> str: + system = platform.system() + machine = platform.machine().lower() + if system == "Windows": + return "win-x64" + if system == "Darwin": + return "osx-arm64" if machine in ("arm64", "aarch64") else "osx-x64" + return "linux-x64" + + +def _which(cmd: str) -> str | None: + return shutil.which(cmd) + + +def _http_get_json(url: str) -> dict: + req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + with urllib.request.urlopen(req, timeout=120) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def _download_file(url: str, dest: Path) -> None: + dest.parent.mkdir(parents=True, exist_ok=True) + req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + with urllib.request.urlopen(req, timeout=600) as resp: + dest.write_bytes(resp.read()) + + +def _extract_cli_zip_bundle(zip_path: Path, tools_dir: Path) -> Path: + """ + Extract a RID-specific CLI zip (flat layout) so DiscordChatExporter.Cli.exe + sits next to its dependencies under tools/DiscordChatExporter-cli/. + """ + bundle = tools_dir / DCE_CLI_BUNDLE_SUBDIR + if bundle.exists(): + shutil.rmtree(bundle) + bundle.mkdir(parents=True, exist_ok=True) + target_name = "DiscordChatExporter.Cli.exe" + with zipfile.ZipFile(zip_path, "r") as zf: + zf.extractall(bundle) + exe = bundle / target_name + if not exe.is_file(): + raise DiscordChatExporterCliNotFoundError( + f"No {target_name} found inside {zip_path} after extract" + ) + return exe + + +def _extract_cli_exe_only_from_zip(zip_path: Path, dest_dir: Path) -> Path: + """Legacy: single nested or flat zip where copying only the .exe is enough.""" + dest_dir.mkdir(parents=True, exist_ok=True) + target_name = "DiscordChatExporter.Cli.exe" + with zipfile.ZipFile(zip_path, "r") as zf: + member = None + for name in zf.namelist(): + if name.endswith(target_name): + member = name + break + if member is None: + raise DiscordChatExporterCliNotFoundError( + f"No {target_name} found inside {zip_path}" + ) + data = zf.read(member) + out = dest_dir / target_name + out.write_bytes(data) + return out + + +def _clone_repo(vendor_dir: Path, *, tag: str) -> None: + if vendor_dir.exists(): + return + vendor_dir.parent.mkdir(parents=True, exist_ok=True) + git = _which("git") + if not git: + raise DiscordChatExporterCliNotFoundError( + "git not found in PATH; install Git to build DiscordChatExporter from source." + ) + subprocess.run( + [ + git, + "clone", + "-b", + tag, + "--depth", + "1", + DCE_GIT_URL, + str(vendor_dir), + ], + check=True, + capture_output=True, + text=True, + ) + + +def _dotnet_publish(vendor_dir: Path, tools_dir: Path) -> Path: + dotnet = _which("dotnet") + if not dotnet: + raise DiscordChatExporterCliNotFoundError( + ".NET SDK not found in PATH; install .NET SDK to build DiscordChatExporter, " + "or place the CLI manually under workspace/discord_activity_tracker/tools/." + ) + csproj = vendor_dir / "DiscordChatExporter.Cli" / "DiscordChatExporter.Cli.csproj" + if not csproj.is_file(): + raise DiscordChatExporterCliNotFoundError( + f"Expected project at {csproj} after clone." + ) + rid = _dotnet_rid() + publish_out = tools_dir / "dce_publish" + if publish_out.exists(): + shutil.rmtree(publish_out) + publish_out.mkdir(parents=True, exist_ok=True) + cmd = [ + dotnet, + "publish", + str(csproj), + "-c", + "Release", + "-r", + rid, + "--self-contained", + "true", + "-o", + str(publish_out), + ] + logger.info("Building DiscordChatExporter CLI: %s", " ".join(cmd)) + subprocess.run(cmd, check=True, capture_output=True, text=True) + name = _cli_exe_name() + exe = publish_out / name + if exe.is_file(): + return exe + for p in publish_out.rglob(name): + if p.is_file(): + return p + raise DiscordChatExporterCliNotFoundError( + f"dotnet publish succeeded but {name} not found under {publish_out}" + ) + + +def _preferred_cli_zip_name() -> str | None: + """Release asset name for prebuilt CLI (Tyrrrz 2.4x+ ships per-RID zips).""" + if not _win32(): + return None + machine = platform.machine().lower() + if machine in ("arm64", "aarch64"): + return "DiscordChatExporter.Cli.win-arm64.zip" + return "DiscordChatExporter.Cli.win-x64.zip" + + +def _try_download_release_zip(tools_dir: Path) -> Path | None: + """Download pinned release zip and extract Windows CLI; returns path or None if not applicable.""" + if not _win32(): + return None + tag = _pinned_release_tag() + api_url = _release_api_url(tag) + try: + data = _http_get_json(api_url) + except (urllib.error.URLError, json.JSONDecodeError, TimeoutError) as e: + logger.warning("Could not fetch GitHub release %s (%s): %s", tag, api_url, e) + return None + assets = data.get("assets") or [] + name_to_url = { + (a.get("name") or ""): a.get("browser_download_url") + for a in assets + if a.get("name") and a.get("browser_download_url") + } + + zip_url = None + chosen_asset_name: str | None = None + preferred = _preferred_cli_zip_name() + if preferred and preferred in name_to_url: + zip_url = name_to_url[preferred] + chosen_asset_name = preferred + if not zip_url: + # Older releases: one umbrella .zip containing Cli.exe + for a in assets: + an = (a.get("name") or "").lower() + if an.endswith(".zip") and "discord" in an and "exporter" in an: + zip_url = a.get("browser_download_url") + chosen_asset_name = a.get("name") + break + if not zip_url: + for a in assets: + if (a.get("name") or "").lower().endswith(".zip"): + zip_url = a.get("browser_download_url") + chosen_asset_name = a.get("name") + break + if not zip_url: + return None + tools_dir.mkdir(parents=True, exist_ok=True) + with tempfile.TemporaryDirectory() as tmp: + zpath = Path(tmp) / "dce.zip" + logger.info( + "Downloading DiscordChatExporter %s asset %s from GitHub", + tag, + chosen_asset_name or "zip", + ) + _download_file(zip_url, zpath) + # Per-RID zips (2.4x+) are a flat bundle of exe + DLLs — extract all. + ca = (chosen_asset_name or "").lower() + if ca.startswith("discordchatexporter.cli.") and ca.endswith(".zip"): + return _extract_cli_zip_bundle(zpath, tools_dir) + return _extract_cli_exe_only_from_zip(zpath, tools_dir) + + +def ensure_discord_chat_exporter_cli() -> Path: + """ + Return path to DiscordChatExporter CLI, installing under workspace/tools/ if missing. + + Resolution order: + 1. tools/ or script/ (existing install or symlink) + 2. Windows: pinned GitHub release .zip (see ``DISCORD_CHAT_EXPORTER_VERSION``, default 2.47) + 3. git clone (same tag) + dotnet publish (non-Windows or if zip path unavailable) + """ + tools_dir = get_tools_dir() + script_dir = get_script_dir() + existing = _find_cli_anywhere(tools_dir, script_dir) + if existing: + logger.debug("Using existing DiscordChatExporter CLI at %s", existing) + return existing + + tools_dir.mkdir(parents=True, exist_ok=True) + + if _win32(): + try: + exe = _try_download_release_zip(tools_dir) + if exe and exe.is_file(): + logger.info( + "Installed DiscordChatExporter CLI from release zip: %s", exe + ) + return exe + except Exception as e: + logger.warning("Release zip install failed: %s", e) + + vendor_dir = get_workspace_root() / "vendor" / "DiscordChatExporter" + try: + _clone_repo(vendor_dir, tag=_pinned_release_tag()) + exe = _dotnet_publish(vendor_dir, tools_dir) + final = tools_dir / _cli_exe_name() + if exe.resolve() != final.resolve(): + shutil.copy2(exe, final) + if not _win32(): + final.chmod(final.stat().st_mode | 0o111) + logger.info("Built DiscordChatExporter CLI: %s", final) + return final + except subprocess.CalledProcessError as e: + stderr = (e.stderr or "") + (e.stdout or "") + raise DiscordChatExporterCliNotFoundError( + f"Failed to build DiscordChatExporter CLI: {stderr[:2000]}" + ) from e + + +def get_dce_cli_path() -> Path: + """Return CLI path, calling ensure_discord_chat_exporter_cli if needed.""" + return ensure_discord_chat_exporter_cli() diff --git a/discord_activity_tracker/sync/importer.py b/discord_activity_tracker/sync/importer.py new file mode 100644 index 0000000..cd9cf24 --- /dev/null +++ b/discord_activity_tracker/sync/importer.py @@ -0,0 +1,90 @@ +"""Import DiscordChatExporter JSON payloads into the database (shared by exporter + backfill).""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +from asgiref.sync import sync_to_async + +from discord_activity_tracker.services import ( + get_or_create_discord_server, + get_or_create_discord_channel, + update_channel_last_synced, + update_channel_last_activity, +) +from discord_activity_tracker.sync.chat_exporter import convert_exporter_message_to_dict +from discord_activity_tracker.sync.messages import _process_messages_in_batches +from discord_activity_tracker.sync.utils import parse_datetime + +logger = logging.getLogger(__name__) + + +async def persist_exporter_channel_payloads( + parsed_data: List[Dict[str, Any]], + *, + expected_guild_id: Optional[int] = None, +) -> None: + """ + Persist parsed DiscordChatExporter channel dicts. + + Each item should have keys ``guild``, ``channel``, ``messages`` (same shape as + ``parse_exported_json`` output). Optional ``expected_guild_id`` skips rows whose + ``guild['id']`` does not match (logs a warning). + """ + for channel_data in parsed_data: + try: + guild_info = channel_data["guild"] + channel_info = channel_data["channel"] + messages = channel_data["messages"] + + gid = guild_info.get("id") + if expected_guild_id is not None and gid != expected_guild_id: + logger.warning( + "Skipping JSON for guild %s (expected %s) channel %s", + gid, + expected_guild_id, + channel_info.get("name"), + ) + continue + + server, _ = await sync_to_async(get_or_create_discord_server)( + server_id=guild_info["id"], + server_name=guild_info["name"], + icon_url="", + ) + + channel, _ = await sync_to_async(get_or_create_discord_channel)( + server=server, + channel_id=channel_info["id"], + channel_name=channel_info["name"], + channel_type=channel_info.get("type", "text"), + topic=channel_info.get("topic") or "", + position=0, + ) + + converted = [convert_exporter_message_to_dict(msg) for msg in messages] + + processed = await _process_messages_in_batches(channel, converted) + + if messages: + last_msg = convert_exporter_message_to_dict(messages[-1]) + last_time = parse_datetime(last_msg.get("created_at")) + if last_time: + await sync_to_async(update_channel_last_activity)( + channel, last_time + ) + + await sync_to_async(update_channel_last_synced)(channel) + + logger.info( + "Synced #%s: %s/%s messages", + channel.channel_name, + processed, + len(messages), + ) + + except Exception as e: + ch_name = channel_data.get("channel", {}).get("name") + logger.error("Failed to persist channel %s: %s", ch_name, e) + continue diff --git a/discord_activity_tracker/tests/fixtures/backfill_minimal.json b/discord_activity_tracker/tests/fixtures/backfill_minimal.json new file mode 100644 index 0000000..dd287c7 --- /dev/null +++ b/discord_activity_tracker/tests/fixtures/backfill_minimal.json @@ -0,0 +1,27 @@ +{ + "guild": { + "id": 331718482485837825, + "name": "Together C & C++" + }, + "channel": { + "id": 851121440425639956, + "name": "c-cpp-discussion", + "type": 0, + "topic": "" + }, + "messages": [ + { + "id": 1900000000000000001, + "timestamp": "2024-06-01T15:30:00+00:00", + "author": { + "id": 90001, + "name": "backfill_test_user", + "nickname": null, + "isBot": false + }, + "content": "minimal backfill fixture message", + "attachments": [], + "reactions": [] + } + ] +} diff --git a/discord_activity_tracker/tests/test_backfill_discord_json.py b/discord_activity_tracker/tests/test_backfill_discord_json.py new file mode 100644 index 0000000..191bec8 --- /dev/null +++ b/discord_activity_tracker/tests/test_backfill_discord_json.py @@ -0,0 +1,52 @@ +"""Tests for backfill JSON import.""" + +import json +import shutil +from pathlib import Path + +import pytest +from django.core.management import call_command + +from discord_activity_tracker.models import DiscordMessage + + +FIXTURE = Path(__file__).resolve().parent / "fixtures" / "backfill_minimal.json" + + +@pytest.mark.django_db +class TestBackfillDiscordJson: + def test_persist_fixture_creates_message(self, settings): + settings.DISCORD_SERVER_ID = "331718482485837825" + import asyncio + + from discord_activity_tracker.sync.importer import ( + persist_exporter_channel_payloads, + ) + + data = json.loads(FIXTURE.read_text(encoding="utf-8")) + payload = { + "guild": data["guild"], + "channel": data["channel"], + "messages": data["messages"], + } + asyncio.run( + persist_exporter_channel_payloads( + [payload], + expected_guild_id=331718482485837825, + ) + ) + assert DiscordMessage.objects.filter(message_id=1900000000000000001).exists() + + def test_backfill_command_imports_file(self, settings, tmp_path): + settings.DISCORD_SERVER_ID = "331718482485837825" + layout = tmp_path / "2014" / "2014-06" + layout.mkdir(parents=True) + target = layout / "2014-06-01.json" + shutil.copy(FIXTURE, target) + + call_command( + "backfill_discord_json", + path=str(tmp_path), + guild_id=331718482485837825, + ) + assert DiscordMessage.objects.filter(message_id=1900000000000000001).exists() diff --git a/discord_activity_tracker/tests/test_backfill_paths.py b/discord_activity_tracker/tests/test_backfill_paths.py new file mode 100644 index 0000000..dbc8c0a --- /dev/null +++ b/discord_activity_tracker/tests/test_backfill_paths.py @@ -0,0 +1,51 @@ +"""Tests for backfill path ordering and date windows.""" + +from datetime import datetime +from pathlib import Path + + +from discord_activity_tracker.sync.backfill_paths import ( + discussion_json_sort_key, + iter_discussion_json_files, + json_path_in_date_window, +) + + +def test_sort_day_before_chunk(tmp_path: Path): + d = tmp_path / "2017" / "2017-06" + d.mkdir(parents=True) + day = d / "2017-06-02.json" + chunk = tmp_path / "2017-06-01_to_2017-06-10.json" + day.write_text("{}") + chunk.write_text("{}") + ordered = list(iter_discussion_json_files(tmp_path)) + assert ordered[0].stem == "2017-06-02" + assert "2017-06-01_to_2017-06-10" in ordered[1].stem + + +def test_skips_resource_fork(tmp_path: Path): + (tmp_path / "._x.json").write_text("{}") + (tmp_path / "ok.json").write_text("{}") + assert list(iter_discussion_json_files(tmp_path)) == [tmp_path / "ok.json"] + + +def test_json_path_in_date_window_day(): + p = Path("/x/2024-01-15.json") + since = datetime(2024, 1, 1) + until = datetime(2024, 1, 20) + assert json_path_in_date_window(p, since, until) is True + assert json_path_in_date_window(p, datetime(2024, 2, 1), None) is False + + +def test_json_path_in_date_window_chunk(): + p = Path("/x/2024-01-01_to_2024-01-10.json") + assert ( + json_path_in_date_window(p, datetime(2024, 1, 5), datetime(2024, 1, 20)) is True + ) + assert json_path_in_date_window(p, datetime(2024, 1, 15), None) is False + + +def test_discussion_json_sort_key_stable(): + a = Path("/a/2017-06-01.json") + b = Path("/b/2017-06-02.json") + assert discussion_json_sort_key(a) < discussion_json_sort_key(b) diff --git a/discord_activity_tracker/tests/test_bulk_services.py b/discord_activity_tracker/tests/test_bulk_services.py index d707656..55f775f 100644 --- a/discord_activity_tracker/tests/test_bulk_services.py +++ b/discord_activity_tracker/tests/test_bulk_services.py @@ -68,53 +68,54 @@ def channel(server): @pytest.mark.django_db -class TestBulkUpsertUsers: - def test_insert_new_users(self): - user_data = [ - _user(1001, "alice", display="Alice"), - _user(1002, "bob", display="Bob", bot=True), - ] - result = bulk_upsert_discord_users(user_data) - - assert len(result) == 2 - assert 1001 in result - assert 1002 in result - assert result[1001].discord_user_id == 1001 - assert DiscordProfile.objects.count() == 2 - - def test_update_existing_users(self): - DiscordProfile.objects.create( - discord_user_id=1001, - type="discord", - username="alice_old", - display_name="Old", - is_bot=False, - ) - - result = bulk_upsert_discord_users( - [_user(1001, "alice_new", display="New Alice")] - ) - - assert len(result) == 1 - refreshed = DiscordProfile.objects.get(discord_user_id=1001) - assert refreshed.username == "alice_new" - assert refreshed.display_name == "New Alice" - - def test_deduplicates_by_user_id(self): - user_data = [ - _user(1001, "first"), - _user(1001, "second"), - ] - result = bulk_upsert_discord_users(user_data) - - assert len(result) == 1 - assert DiscordProfile.objects.count() == 1 - # Last-seen wins - assert DiscordProfile.objects.get(discord_user_id=1001).username == "second" - - def test_empty_input(self): - result = bulk_upsert_discord_users([]) - assert result == {} +def test_insert_new_users(): + user_data = [ + _user(1001, "alice", display="Alice"), + _user(1002, "bob", display="Bob", bot=True), + ] + result = bulk_upsert_discord_users(user_data) + + assert len(result) == 2 + assert 1001 in result + assert 1002 in result + assert result[1001].discord_user_id == 1001 + + +@pytest.mark.django_db +def test_update_existing_users(): + DiscordProfile.objects.create( + discord_user_id=1001, + type="discord", + username="alice_old", + display_name="Old", + is_bot=False, + ) + + result = bulk_upsert_discord_users([_user(1001, "alice_new", display="New Alice")]) + + assert len(result) == 1 + refreshed = DiscordProfile.objects.get(discord_user_id=1001) + assert refreshed.username == "alice_new" + assert refreshed.display_name == "New Alice" + + +@pytest.mark.django_db +def test_deduplicates_by_user_id(): + user_data = [ + _user(1001, "first"), + _user(1001, "second"), + ] + result = bulk_upsert_discord_users(user_data) + + assert len(result) == 1 + # Last-seen wins + assert DiscordProfile.objects.get(discord_user_id=1001).username == "second" + + +@pytest.mark.django_db +def test_bulk_upsert_users_empty_input(): + result = bulk_upsert_discord_users([]) + assert result == {} # ------------------------------------------------------------------- @@ -123,69 +124,70 @@ def test_empty_input(self): @pytest.mark.django_db -class TestBulkUpsertMessages: - def test_insert_new_messages(self, channel): - user_map = bulk_upsert_discord_users([_user(1001, "alice", display="Alice")]) +def test_insert_new_messages(channel): + user_map = bulk_upsert_discord_users([_user(1001, "alice", display="Alice")]) + + now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) + msg_data = [ + _msg(5001, 1001, content="Hello world", ts=now), + _msg( + 5002, + 1001, + content="Second message", + ts=now, + attachments=["https://example.com/file.png"], + ), + ] + + result = bulk_upsert_discord_messages(msg_data, channel, user_map) + assert len(result) == 2 + + msg1 = DiscordMessage.objects.get(message_id=5001) + assert msg1.content == "Hello world" + assert msg1.has_attachments is False + + msg2 = DiscordMessage.objects.get(message_id=5002) + assert msg2.has_attachments is True + assert msg2.attachment_urls == ["https://example.com/file.png"] + + +@pytest.mark.django_db +def test_update_existing_messages(channel): + user_map = bulk_upsert_discord_users([_user(1001, "alice")]) + now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) + + # Insert first + bulk_upsert_discord_messages( + [_msg(5001, 1001, content="Original", ts=now)], + channel, + user_map, + ) - now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) - msg_data = [ - _msg(5001, 1001, content="Hello world", ts=now), + # Update + edited_at = datetime(2026, 2, 17, 13, 0, 0, tzinfo=timezone.utc) + bulk_upsert_discord_messages( + [ _msg( - 5002, + 5001, 1001, - content="Second message", + content="Edited content", ts=now, - attachments=["https://example.com/file.png"], - ), - ] - - result = bulk_upsert_discord_messages(msg_data, channel, user_map) - assert len(result) == 2 - assert DiscordMessage.objects.count() == 2 - - msg1 = DiscordMessage.objects.get(message_id=5001) - assert msg1.content == "Hello world" - assert msg1.has_attachments is False - - msg2 = DiscordMessage.objects.get(message_id=5002) - assert msg2.has_attachments is True - assert msg2.attachment_urls == ["https://example.com/file.png"] - - def test_update_existing_messages(self, channel): - user_map = bulk_upsert_discord_users([_user(1001, "alice")]) - now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) - - # Insert first - bulk_upsert_discord_messages( - [_msg(5001, 1001, content="Original", ts=now)], - channel, - user_map, - ) - - # Update - edited_at = datetime(2026, 2, 17, 13, 0, 0, tzinfo=timezone.utc) - bulk_upsert_discord_messages( - [ - _msg( - 5001, - 1001, - content="Edited content", - ts=now, - edited_at=edited_at, - ) - ], - channel, - user_map, - ) + edited_at=edited_at, + ) + ], + channel, + user_map, + ) - assert DiscordMessage.objects.count() == 1 - msg = DiscordMessage.objects.get(message_id=5001) - assert msg.content == "Edited content" - assert msg.message_edited_at == edited_at + msg = DiscordMessage.objects.get(message_id=5001) + assert msg.content == "Edited content" + assert msg.message_edited_at == edited_at - def test_empty_input(self, channel): - result = bulk_upsert_discord_messages([], channel, {}) - assert result == {} + +@pytest.mark.django_db +def test_bulk_upsert_messages_empty_input(channel): + result = bulk_upsert_discord_messages([], channel, {}) + assert result == {} # ------------------------------------------------------------------- @@ -194,48 +196,47 @@ def test_empty_input(self, channel): @pytest.mark.django_db -class TestBulkUpsertReactions: - def test_insert_reactions(self, channel): - user_map = bulk_upsert_discord_users([_user(1001, "alice")]) - now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) - message_map = bulk_upsert_discord_messages( - [_msg(5001, 1001, content="Test", ts=now)], - channel, - user_map, - ) - - reaction_data = [ - {"discord_message_id": 5001, "emoji": "\U0001f44d", "count": 3}, - {"discord_message_id": 5001, "emoji": "\U0001f389", "count": 1}, - ] - bulk_upsert_discord_reactions(reaction_data, message_map) - - assert DiscordReaction.objects.count() == 2 - thumbs = DiscordReaction.objects.get(emoji="\U0001f44d") - assert thumbs.count == 3 - - def test_update_reaction_count(self, channel): - user_map = bulk_upsert_discord_users([_user(1001, "alice")]) - now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) - message_map = bulk_upsert_discord_messages( - [_msg(5001, 1001, content="Test", ts=now)], - channel, - user_map, - ) - - # Insert - bulk_upsert_discord_reactions( - [{"discord_message_id": 5001, "emoji": "\U0001f44d", "count": 1}], - message_map, - ) - # Update - bulk_upsert_discord_reactions( - [{"discord_message_id": 5001, "emoji": "\U0001f44d", "count": 5}], - message_map, - ) - - assert DiscordReaction.objects.count() == 1 - assert DiscordReaction.objects.get(emoji="\U0001f44d").count == 5 +def test_insert_reactions(channel): + user_map = bulk_upsert_discord_users([_user(1001, "alice")]) + now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) + message_map = bulk_upsert_discord_messages( + [_msg(5001, 1001, content="Test", ts=now)], + channel, + user_map, + ) + + reaction_data = [ + {"discord_message_id": 5001, "emoji": "\U0001f44d", "count": 3}, + {"discord_message_id": 5001, "emoji": "\U0001f389", "count": 1}, + ] + bulk_upsert_discord_reactions(reaction_data, message_map) + + thumbs = DiscordReaction.objects.get(emoji="\U0001f44d") + assert thumbs.count == 3 + + +@pytest.mark.django_db +def test_update_reaction_count(channel): + user_map = bulk_upsert_discord_users([_user(1001, "alice")]) + now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) + message_map = bulk_upsert_discord_messages( + [_msg(5001, 1001, content="Test", ts=now)], + channel, + user_map, + ) + + # Insert + bulk_upsert_discord_reactions( + [{"discord_message_id": 5001, "emoji": "\U0001f44d", "count": 1}], + message_map, + ) + # Update + bulk_upsert_discord_reactions( + [{"discord_message_id": 5001, "emoji": "\U0001f44d", "count": 5}], + message_map, + ) + + assert DiscordReaction.objects.get(emoji="\U0001f44d").count == 5 # ------------------------------------------------------------------- @@ -244,73 +245,68 @@ def test_update_reaction_count(self, channel): @pytest.mark.django_db -class TestBulkProcessMessageBatch: - def test_full_batch(self, channel): - now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) - messages = [ - { - "message_id": 5001, - "author": _user(1001, "alice", display="Alice"), - "content": "Hello!", - "message_created_at": now, - "message_edited_at": None, - "reply_to_message_id": None, - "attachment_urls": [], - "reactions": [ - {"emoji": "\U0001f44d", "count": 2}, - {"emoji": "\u2764\ufe0f", "count": 1}, - ], - }, - { - "message_id": 5002, - "author": _user(1002, "bob", display="Bob"), - "content": "Hi there!", - "message_created_at": now, - "message_edited_at": None, - "reply_to_message_id": 5001, - "attachment_urls": ["https://example.com/img.png"], - "reactions": [], - }, - ] - - count = bulk_process_message_batch(messages, channel) - - assert count == 2 - assert DiscordProfile.objects.count() == 2 - assert DiscordMessage.objects.count() == 2 - assert DiscordReaction.objects.count() == 2 - - msg1 = DiscordMessage.objects.get(message_id=5001) - assert msg1.content == "Hello!" - assert msg1.author.username == "alice" - - msg2 = DiscordMessage.objects.get(message_id=5002) - assert msg2.reply_to_message_id == 5001 - assert msg2.has_attachments is True - - def test_empty_batch(self, channel): - count = bulk_process_message_batch([], channel) - assert count == 0 - - def test_idempotent(self, channel): - """Running same batch twice should not create duplicates.""" - now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) - messages = [ - { - "message_id": 5001, - "author": _user(1001, "alice"), - "content": "Test", - "message_created_at": now, - "message_edited_at": None, - "reply_to_message_id": None, - "attachment_urls": [], - "reactions": [{"emoji": "\U0001f44d", "count": 1}], - }, - ] - - bulk_process_message_batch(messages, channel) - bulk_process_message_batch(messages, channel) - - assert DiscordProfile.objects.count() == 1 - assert DiscordMessage.objects.count() == 1 - assert DiscordReaction.objects.count() == 1 +def test_full_batch(channel): + now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) + messages = [ + { + "message_id": 5001, + "author": _user(1001, "alice", display="Alice"), + "content": "Hello!", + "message_created_at": now, + "message_edited_at": None, + "reply_to_message_id": None, + "attachment_urls": [], + "reactions": [ + {"emoji": "\U0001f44d", "count": 2}, + {"emoji": "\u2764\ufe0f", "count": 1}, + ], + }, + { + "message_id": 5002, + "author": _user(1002, "bob", display="Bob"), + "content": "Hi there!", + "message_created_at": now, + "message_edited_at": None, + "reply_to_message_id": 5001, + "attachment_urls": ["https://example.com/img.png"], + "reactions": [], + }, + ] + + count = bulk_process_message_batch(messages, channel) + + assert count == 2 + + msg1 = DiscordMessage.objects.get(message_id=5001) + assert msg1.content == "Hello!" + assert msg1.author.username == "alice" + + msg2 = DiscordMessage.objects.get(message_id=5002) + assert msg2.reply_to_message_id == 5001 + assert msg2.has_attachments is True + + +@pytest.mark.django_db +def test_empty_batch(channel): + count = bulk_process_message_batch([], channel) + assert count == 0 + + +@pytest.mark.django_db +def test_idempotent(channel): + """Running same batch twice should not create duplicates.""" + now = datetime(2026, 2, 17, 12, 0, 0, tzinfo=timezone.utc) + messages = [ + { + "message_id": 5001, + "author": _user(1001, "alice"), + "content": "Test", + "message_created_at": now, + "message_edited_at": None, + "reply_to_message_id": None, + "attachment_urls": [], + "reactions": [{"emoji": "\U0001f44d", "count": 1}], + }, + ] + + bulk_process_message_batch(messages, channel) diff --git a/discord_activity_tracker/workspace.py b/discord_activity_tracker/workspace.py index 54f0063..e499c3c 100644 --- a/discord_activity_tracker/workspace.py +++ b/discord_activity_tracker/workspace.py @@ -11,6 +11,25 @@ def get_workspace_root() -> Path: return get_workspace_path(_APP_SLUG) +def get_tools_dir() -> Path: + """Return workspace/discord_activity_tracker/tools/ for DiscordChatExporter CLI binaries.""" + path = get_workspace_root() / "tools" + path.mkdir(parents=True, exist_ok=True) + return path + + +def get_script_dir() -> Path: + """Return workspace/discord_activity_tracker/script/ (legacy export scripts, optional).""" + path = get_workspace_root() / "script" + path.mkdir(parents=True, exist_ok=True) + return path + + +def get_discussion_export_dir() -> Path: + """Default tree for per-day exports: Discussion - c-cpp-discussion/ (mkdir only when writing).""" + return get_workspace_root() / "Discussion - c-cpp-discussion" + + def get_raw_dir() -> Path: """Return workspace/discord_activity_tracker/raw/ for DiscordChatExporter JSON output.""" path = get_workspace_root() / "raw"