Skip to content

Python API Reference

Auto-generated from source code docstrings.

Command-line interface for chess-self-coach.

Entry point for the CLI. Dispatches to subcommands: setup, train, update, syzygy.

main(argv=None)

Main CLI entry point.

Parameters:

Name Type Description Default
argv list[str] | None

Command-line arguments (defaults to sys.argv[1:]).

None
Source code in src/chess_self_coach/cli.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def main(argv: list[str] | None = None) -> None:
    """Main CLI entry point.

    Args:
        argv: Command-line arguments (defaults to sys.argv[1:]).
    """
    parser = argparse.ArgumentParser(
        prog="chess-self-coach",
        description="Learn from your chess mistakes: Stockfish analysis + spaced repetition training.",
    )
    parser.add_argument(
        "--version", action="version", version=f"%(prog)s {__version__}"
    )

    subparsers = parser.add_subparsers(dest="command", help="Available commands")

    # --- setup ---
    subparsers.add_parser(
        "setup",
        help="Interactive setup: verify Stockfish, configure game platforms",
    )

    # --- update ---
    subparsers.add_parser(
        "update",
        help="Update chess-self-coach to the latest version",
    )

    # --- syzygy ---
    p_syzygy = subparsers.add_parser(
        "syzygy",
        help="Manage Syzygy endgame tablebases",
    )
    p_syzygy.add_argument(
        "action",
        choices=["download", "status"],
        help="download: fetch 3-5 piece tables (~1 GB). status: show installed tables.",
    )

    # --- train ---
    p_train = subparsers.add_parser(
        "train",
        help="Training mode: extract mistakes from games and drill with spaced repetition",
    )
    p_train.add_argument(
        "--prepare",
        action="store_true",
        help="Analyze games and export training_data.json",
    )
    p_train.add_argument(
        "--serve",
        action="store_true",
        help="Open the training PWA in the browser",
    )
    p_train.add_argument(
        "--stats",
        action="store_true",
        help="Show training progress statistics",
    )
    p_train.add_argument(
        "--derive",
        action="store_true",
        help="Re-derive training_data.json from analysis_data.json (no Stockfish needed)",
    )
    p_train.add_argument(
        "--games",
        type=int,
        default=10,
        help="Maximum games to analyze (default: 10)",
    )
    p_train.add_argument(
        "--depth",
        type=int,
        default=18,
        help="Stockfish analysis depth (default: 18)",
    )
    p_train.add_argument(
        "--threads",
        type=int,
        default=None,
        help="Stockfish threads (default: auto = CPU count - 1)",
    )
    p_train.add_argument(
        "--hash",
        type=int,
        default=None,
        dest="hash_mb",
        help="Stockfish hash table size in MB (default: 1024)",
    )
    p_train.add_argument(
        "--reanalyze-all",
        action="store_true",
        dest="reanalyze_all",
        help="Re-analyze all games (skip only those with identical settings)",
    )
    p_train.add_argument(
        "--engine",
        type=str,
        default=None,
        help="Path to the Stockfish binary (overrides config.json)",
    )
    p_train.add_argument(
        "--refresh-explanations",
        action="store_true",
        dest="refresh_explanations",
        help="[Dev] Regenerate explanations without re-running Stockfish",
    )
    p_train.add_argument(
        "--fresh",
        action="store_true",
        help="[Dev] Discard existing training data and start from scratch",
    )

    args = parser.parse_args(argv)

    if args.command is None:
        _launch_server()
        return

    if args.command == "setup":
        _setup()

    elif args.command == "update":
        from chess_self_coach.updater import update

        update()

    elif args.command == "syzygy":
        from chess_self_coach.syzygy import (
            _DEFAULT_DIR as _SYZYGY_DEFAULT,
            download_syzygy,
            syzygy_status,
        )

        if args.action == "download":
            default = str(_SYZYGY_DEFAULT)
            custom = input(f"  Installation directory [{default}]: ").strip()
            target = Path(custom) if custom else _SYZYGY_DEFAULT

            try:
                path = download_syzygy(target_dir=target)
                print(f"  ✓ Syzygy tables downloaded to {path}")
            except (FileNotFoundError, Exception) as e:
                print(f"  ❌ {e}", file=sys.stderr)
                sys.exit(1)
        elif args.action == "status":
            from chess_self_coach.config import ConfigError, error_exit, load_config

            try:
                config = load_config()
            except ConfigError as e:
                error_exit(str(e), hint=e.hint)
            status = syzygy_status(config)
            if status["found"]:
                print(f"  Path: {status['path']}")
                print(f"  WDL files: {status['wdl_count']}")
                print(f"  DTZ files: {status['dtz_count']}")
                print(f"  Total size: {status['total_size_mb']} MB")
            else:
                print("  No Syzygy tables found.")
                print("  Download with: chess-self-coach syzygy download")

    elif args.command == "train":
        if args.derive:
            from chess_self_coach.training_data import generate_training_data

            try:
                generate_training_data()
            except (FileNotFoundError, RuntimeError) as e:
                print(f"  {e}", file=sys.stderr)
                sys.exit(1)
        elif args.refresh_explanations:
            from chess_self_coach.trainer import refresh_explanations

            refresh_explanations()
        elif args.prepare:
            from chess_self_coach.analysis import AnalysisSettings, analyze_games
            from chess_self_coach.opening_explorer import ExplorerAPIError

            # Build settings from config, with CLI overrides
            from chess_self_coach.config import ConfigError, error_exit, load_config

            try:
                config = load_config()
            except ConfigError as e:
                error_exit(str(e), hint=e.hint)
            settings = AnalysisSettings.from_config(config)
            if args.threads is not None:
                settings.threads = args.threads
            if args.hash_mb is not None:
                settings.hash_mb = args.hash_mb

            try:
                analyze_games(
                    max_games=args.games,
                    reanalyze_all=args.reanalyze_all,
                    settings=settings,
                    engine_path=args.engine,
                )
            except (FileNotFoundError, RuntimeError, ExplorerAPIError) as e:
                print(f"  {e}", file=sys.stderr)
                sys.exit(1)

            from chess_self_coach.tactics import run_tactical_analysis

            run_tactical_analysis()

            from chess_self_coach.classifier import run_classification

            run_classification()
        elif args.serve:
            print("  Tip: you can now just run `chess-self-coach` directly.\n")
            _launch_server()
        elif args.stats:
            from chess_self_coach.trainer import print_stats

            print_stats()
        else:
            print("Usage: chess-self-coach train [--prepare|--derive|--serve|--stats]")
            print("Run 'chess-self-coach train -h' for details.")

Configuration loading for chess-self-coach.

Loads config.json (Stockfish path, player usernames) and .env (Lichess token). Every error produces a clear message with the exact command to fix it.

ConfigError

Bases: Exception

Raised when config.json is missing, invalid, or unreadable.

Attributes:

Name Type Description
hint

Optional fix suggestion for the user.

Source code in src/chess_self_coach/config.py
20
21
22
23
24
25
26
27
28
29
class ConfigError(Exception):
    """Raised when config.json is missing, invalid, or unreadable.

    Attributes:
        hint: Optional fix suggestion for the user.
    """

    def __init__(self, message: str, hint: str | None = None) -> None:
        super().__init__(message)
        self.hint = hint

analysis_data_path()

Return the path to analysis_data.json.

Returns:

Type Description
Path

Path to data/analysis_data.json.

Source code in src/chess_self_coach/config.py
83
84
85
86
87
88
89
def analysis_data_path() -> Path:
    """Return the path to analysis_data.json.

    Returns:
        Path to data/analysis_data.json.
    """
    return data_dir() / ANALYSIS_DATA_FILE

check_stockfish_version(sf_path, expected=None)

Check the Stockfish version and warn if it doesn't match expected.

Parameters:

Name Type Description Default
sf_path Path

Path to the Stockfish binary.

required
expected str | None

Expected version string (e.g. "Stockfish 18").

None

Returns:

Type Description
str

The detected version string.

Source code in src/chess_self_coach/config.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def check_stockfish_version(sf_path: Path, expected: str | None = None) -> str:
    """Check the Stockfish version and warn if it doesn't match expected.

    Args:
        sf_path: Path to the Stockfish binary.
        expected: Expected version string (e.g. "Stockfish 18").

    Returns:
        The detected version string.
    """
    try:
        result = subprocess.run(
            [str(sf_path)],
            input="uci\nquit\n",
            capture_output=True,
            text=True,
            timeout=5,
        )
        for line in result.stdout.splitlines():
            if line.startswith("id name "):
                version = line[len("id name ") :]
                if expected and expected not in version:
                    print(
                        f"  ⚠ Warning: Expected {expected}, found {version}",
                        file=sys.stderr,
                    )
                return version
    except (subprocess.TimeoutExpired, OSError) as e:
        print(f"  ⚠ Warning: Could not check Stockfish version: {e}", file=sys.stderr)

    return "unknown"

classifications_data_path()

Return the path to classifications_data.json.

Returns:

Type Description
Path

Path to data/classifications_data.json.

Source code in src/chess_self_coach/config.py
119
120
121
122
123
124
125
def classifications_data_path() -> Path:
    """Return the path to classifications_data.json.

    Returns:
        Path to data/classifications_data.json.
    """
    return data_dir() / CLASSIFICATIONS_DATA_FILE

config_path()

Return the path to config.json.

Returns:

Type Description
Path

Path to data/config.json.

Source code in src/chess_self_coach/config.py
74
75
76
77
78
79
80
def config_path() -> Path:
    """Return the path to config.json.

    Returns:
        Path to data/config.json.
    """
    return data_dir() / CONFIG_FILE

data_dir()

Return the data directory path.

Returns:

Type Description
Path

Path to the data/ directory in the project root.

Source code in src/chess_self_coach/config.py
65
66
67
68
69
70
71
def data_dir() -> Path:
    """Return the data directory path.

    Returns:
        Path to the data/ directory in the project root.
    """
    return _find_project_root() / DATA_DIR

error_exit(message, hint=None, debug_cmd=None)

Print a formatted error and exit.

Parameters:

Name Type Description Default
message str

What went wrong.

required
hint str | None

How to fix it.

None
debug_cmd str | None

A shell command the user can run to debug.

None
Source code in src/chess_self_coach/config.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def error_exit(message: str, hint: str | None = None, debug_cmd: str | None = None) -> NoReturn:
    """Print a formatted error and exit.

    Args:
        message: What went wrong.
        hint: How to fix it.
        debug_cmd: A shell command the user can run to debug.
    """
    print(f"\n{message}", file=sys.stderr)
    if hint:
        print(f"\n  How to fix:\n  {hint}", file=sys.stderr)
    if debug_cmd:
        print(f"\n  To debug manually:\n    {debug_cmd}", file=sys.stderr)
    print(file=sys.stderr)
    sys.exit(1)

fetched_games_path()

Return the path to fetched_games.json.

Returns:

Type Description
Path

Path to data/fetched_games.json.

Source code in src/chess_self_coach/config.py
101
102
103
104
105
106
107
def fetched_games_path() -> Path:
    """Return the path to fetched_games.json.

    Returns:
        Path to data/fetched_games.json.
    """
    return data_dir() / FETCHED_GAMES_FILE

find_stockfish(config=None)

Find a working Stockfish binary.

Search order: config.json path → common install locations → system → $PATH.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Optional loaded config dict.

None

Returns:

Type Description
Path

Path to the Stockfish binary.

Raises:

Type Description
SystemExit

If no Stockfish binary is found.

Source code in src/chess_self_coach/config.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def find_stockfish(config: dict[str, Any] | None = None) -> Path:
    """Find a working Stockfish binary.

    Search order: config.json path → common install locations → system → $PATH.

    Args:
        config: Optional loaded config dict.

    Returns:
        Path to the Stockfish binary.

    Raises:
        SystemExit: If no Stockfish binary is found.
    """
    candidates: list[Path] = []

    # From config
    if config:
        sf_config = config.get("stockfish", {})
        path = sf_config.get("path", "")
        if path and path != "auto":
            candidates.append(Path(path))
        if fallback := sf_config.get("fallback_path"):
            candidates.append(Path(fallback))

    # Default search paths
    candidates.extend(_SF_SEARCH_PATHS)

    # $PATH lookup
    sf_in_path = shutil.which("stockfish")
    if sf_in_path:
        candidates.append(Path(sf_in_path))

    # Test each candidate
    tested = []
    for candidate in candidates:
        if candidate.exists() and candidate.is_file():
            return candidate
        tested.append(f"  - {candidate} ({'exists' if candidate.exists() else 'not found'})")

    error_exit(
        "Stockfish not found.",
        hint=(
            "Paths tested:\n"
            + "\n".join(tested)
            + "\n\n  To fix:\n"
            "  - Install Stockfish: sudo apt install stockfish\n"
            "  - Or specify the path: chess-self-coach analyze --engine /path/to/stockfish file.pgn"
        ),
    )

load_config()

Load config.json from the data directory.

Returns:

Type Description
dict[str, Any]

Parsed config dictionary.

Raises:

Type Description
ConfigError

If config.json is missing or invalid.

Source code in src/chess_self_coach/config.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def load_config() -> dict[str, Any]:
    """Load config.json from the data directory.

    Returns:
        Parsed config dictionary.

    Raises:
        ConfigError: If config.json is missing or invalid.
    """
    cfg = config_path()

    if not cfg.exists():
        # Migration hint: detect old location at project root
        root = _find_project_root()
        old_path = root / CONFIG_FILE
        if old_path.exists():
            raise ConfigError(
                "config.json found at old location (project root).",
                hint=f"Move it to the data directory:\n"
                f"  mkdir -p {root / DATA_DIR}\n"
                f"  mv {old_path} {cfg}",
            )
        raise ConfigError(
            "config.json not found.",
            hint=f"Run 'chess-self-coach setup' to create it,\n"
            f"  or copy {root / DATA_DIR / CONFIG_EXAMPLE_FILE} to {cfg}",
        )

    try:
        with open(cfg) as f:
            return json.load(f)
    except json.JSONDecodeError as e:
        raise ConfigError(
            f"config.json is not valid JSON: {e}",
            hint=f"Check the syntax in {cfg}",
        ) from e

load_lichess_token(required=True)

Load the Lichess API token from .env or environment.

Parameters:

Name Type Description Default
required bool

If True, exit on missing token. If False, return None.

True

Returns:

Type Description
str | None

The API token string, or None if not found and not required.

Raises:

Type Description
SystemExit

If required=True and no token is found or it looks invalid.

Source code in src/chess_self_coach/config.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def load_lichess_token(required: bool = True) -> str | None:
    """Load the Lichess API token from .env or environment.

    Args:
        required: If True, exit on missing token. If False, return None.

    Returns:
        The API token string, or None if not found and not required.

    Raises:
        SystemExit: If required=True and no token is found or it looks invalid.
    """
    root = _find_project_root()
    env_path = root / ENV_FILE

    # Load .env if it exists
    if env_path.exists():
        load_dotenv(env_path)

    token = os.environ.get("LICHESS_API_TOKEN", "").strip()

    if not token:
        if not required:
            return None
        error_exit(
            "Lichess API token not found.",
            hint=(
                "1. Create a token at: https://lichess.org/account/oauth/token/create\n"
                "  2. Save it:\n"
                f'     echo "LICHESS_API_TOKEN=lip_your_token_here" > {env_path}'
            ),
            debug_cmd='curl -H "Authorization: Bearer lip_your_token" https://lichess.org/api/account',
        )

    if not token.startswith("lip_"):
        if not required:
            return None
        error_exit(
            f"Lichess token looks invalid (expected 'lip_...' prefix, got '{token[:8]}...').",
            hint="Regenerate your token at https://lichess.org/account/oauth/token/create",
        )

    return token

save_config(config)

Write config back to config.json atomically.

Parameters:

Name Type Description Default
config dict[str, Any]

The config dictionary to save.

required
Source code in src/chess_self_coach/config.py
183
184
185
186
187
188
189
190
191
192
193
194
def save_config(config: dict[str, Any]) -> None:
    """Write config back to config.json atomically.

    Args:
        config: The config dictionary to save.
    """
    from chess_self_coach.io import atomic_write_json

    cfg = config_path()
    cfg.parent.mkdir(parents=True, exist_ok=True)
    atomic_write_json(cfg, config, pretty=True)
    print(f"  Config saved to {cfg}")

tactics_data_path()

Return the path to tactics_data.json.

Returns:

Type Description
Path

Path to data/tactics_data.json.

Source code in src/chess_self_coach/config.py
110
111
112
113
114
115
116
def tactics_data_path() -> Path:
    """Return the path to tactics_data.json.

    Returns:
        Path to data/tactics_data.json.
    """
    return data_dir() / TACTICS_DATA_FILE

training_data_path()

Return the path to training_data.json.

Returns:

Type Description
Path

Path to data/training_data.json.

Source code in src/chess_self_coach/config.py
92
93
94
95
96
97
98
def training_data_path() -> Path:
    """Return the path to training_data.json.

    Returns:
        Path to data/training_data.json.
    """
    return data_dir() / TRAINING_DATA_FILE

Phase 1: collect raw per-move data from Stockfish, tablebase, and opening explorer.

Stores all evaluation data in analysis_data.json with maximum granularity. Phase 2 (training_data.py) annotates and filters this data into training_data.json.

AnalysisInterrupted

Bases: Exception

Raised when analysis is cancelled via the interrupt signal.

Source code in src/chess_self_coach/analysis.py
927
928
class AnalysisInterrupted(Exception):
    """Raised when analysis is cancelled via the interrupt signal."""

AnalysisSettings dataclass

Engine and analysis configuration for full game analysis.

Attributes:

Name Type Description
threads int

Number of Stockfish threads. 0 means auto (cpu_count - 1).

hash_mb int

Stockfish hash table size in megabytes.

limits dict[str, dict[str, float | int]]

Depth/time limits per piece-count bracket.

Source code in src/chess_self_coach/analysis.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@dataclass
class AnalysisSettings:
    """Engine and analysis configuration for full game analysis.

    Attributes:
        threads: Number of Stockfish threads. 0 means auto (cpu_count - 1).
        hash_mb: Stockfish hash table size in megabytes.
        limits: Depth/time limits per piece-count bracket.
    """

    threads: int = 0
    hash_mb: int = 1024
    limits: dict[str, dict[str, float | int]] = field(
        default_factory=lambda: dict(ANALYSIS_LIMITS)
    )

    @classmethod
    def from_config(cls, config: dict) -> AnalysisSettings:
        """Build settings from a config dict (from config.json).

        Args:
            config: Full config dict. Reads the 'analysis_engine' key.

        Returns:
            AnalysisSettings with values from config, defaults for missing keys.
        """
        section = config.get("analysis_engine", {})
        threads_raw = section.get("threads", "auto")
        if threads_raw == "auto" or threads_raw == 0:
            threads = 0
        else:
            threads = int(threads_raw)
        return cls(
            threads=threads,
            hash_mb=int(section.get("hash_mb", 1024)),
            limits=section.get("limits", dict(ANALYSIS_LIMITS)),
        )

    @property
    def resolved_threads(self) -> int:
        """Actual thread count (resolves 0/auto to cpu_count - 1)."""
        return self.threads if self.threads > 0 else worker_count()

    def to_dict(self) -> dict:
        """Serialize to a dict suitable for JSON storage.

        Returns:
            Dict with threads (resolved to actual count), hash_mb, limits.
        """
        return {
            "threads": self.resolved_threads,
            "hash_mb": self.hash_mb,
            "limits": self.limits,
        }

resolved_threads property

Actual thread count (resolves 0/auto to cpu_count - 1).

from_config(config) classmethod

Build settings from a config dict (from config.json).

Parameters:

Name Type Description Default
config dict

Full config dict. Reads the 'analysis_engine' key.

required

Returns:

Type Description
AnalysisSettings

AnalysisSettings with values from config, defaults for missing keys.

Source code in src/chess_self_coach/analysis.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@classmethod
def from_config(cls, config: dict) -> AnalysisSettings:
    """Build settings from a config dict (from config.json).

    Args:
        config: Full config dict. Reads the 'analysis_engine' key.

    Returns:
        AnalysisSettings with values from config, defaults for missing keys.
    """
    section = config.get("analysis_engine", {})
    threads_raw = section.get("threads", "auto")
    if threads_raw == "auto" or threads_raw == 0:
        threads = 0
    else:
        threads = int(threads_raw)
    return cls(
        threads=threads,
        hash_mb=int(section.get("hash_mb", 1024)),
        limits=section.get("limits", dict(ANALYSIS_LIMITS)),
    )

to_dict()

Serialize to a dict suitable for JSON storage.

Returns:

Type Description
dict

Dict with threads (resolved to actual count), hash_mb, limits.

Source code in src/chess_self_coach/analysis.py
84
85
86
87
88
89
90
91
92
93
94
def to_dict(self) -> dict:
    """Serialize to a dict suitable for JSON storage.

    Returns:
        Dict with threads (resolved to actual count), hash_mb, limits.
    """
    return {
        "threads": self.resolved_threads,
        "hash_mb": self.hash_mb,
        "limits": self.limits,
    }

analyze_games(*, game_ids=None, max_games=10, reanalyze_all=False, settings=None, engine_path=None, on_progress=None, on_game_done=None, cancel=None)

Fetch games, analyze with Stockfish + APIs, write analysis_data.json.

Phase 1 orchestrator: sequential analysis with one multi-threaded Stockfish. Caller is responsible for invoking generate_training_data() (Phase 2) afterwards.

Parameters:

Name Type Description Default
game_ids list[str] | None

Specific game IDs to analyze from the cache. When set, skips the fetch phase and reads from fetched_games.json. When None or empty, fetches from APIs (original behavior).

None
max_games int

Maximum total games in the dataset (default: 10).

10
reanalyze_all bool

If True, re-analyze games (skip only same-settings).

False
settings AnalysisSettings | None

Override analysis settings. None = load from config.

None
engine_path str | None

Override path to Stockfish binary.

None
on_progress Callable[[dict], None] | None

Optional callback for structured progress events.

None
cancel Event | None

Threading event for cancellation.

None
Source code in src/chess_self_coach/analysis.py
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
def analyze_games(
    *,
    game_ids: list[str] | None = None,
    max_games: int = 10,
    reanalyze_all: bool = False,
    settings: AnalysisSettings | None = None,
    engine_path: str | None = None,
    on_progress: Callable[[dict], None] | None = None,
    on_game_done: Callable[[str, dict], None] | None = None,
    cancel: threading.Event | None = None,
) -> None:
    """Fetch games, analyze with Stockfish + APIs, write analysis_data.json.

    Phase 1 orchestrator: sequential analysis with one multi-threaded Stockfish.
    Caller is responsible for invoking generate_training_data() (Phase 2) afterwards.

    Args:
        game_ids: Specific game IDs to analyze from the cache. When set,
            skips the fetch phase and reads from fetched_games.json.
            When None or empty, fetches from APIs (original behavior).
        max_games: Maximum total games in the dataset (default: 10).
        reanalyze_all: If True, re-analyze games (skip only same-settings).
        settings: Override analysis settings. None = load from config.
        engine_path: Override path to Stockfish binary.
        on_progress: Optional callback for structured progress events.
        cancel: Threading event for cancellation.
    """
    from chess_self_coach.config import (
        analysis_data_path,
        check_stockfish_version,
        find_stockfish,
        load_config,
        load_lichess_token,
    )
    from chess_self_coach.importer import fetch_chesscom_games, fetch_lichess_games

    def _emit(event: dict) -> None:
        if on_progress:
            on_progress(event)

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(message)s",
        datefmt="%H:%M:%S",
    )

    config = load_config()
    players = config.get("players", {})
    lichess_user = players.get("lichess", "")
    chesscom_user = players.get("chesscom")

    if not lichess_user and not chesscom_user:
        raise RuntimeError(
            "No player configured. Run 'chess-self-coach setup' to set your Lichess and/or chess.com username."
        )

    # Load settings
    if settings is None:
        settings = AnalysisSettings.from_config(config)
    # Find Stockfish
    if engine_path:
        sf_path = Path(engine_path)
        if not sf_path.exists():
            raise FileNotFoundError(f"Engine not found: {sf_path}")
    else:
        sf_path = find_stockfish(config)
        expected = config.get("stockfish", {}).get("expected_version")
        version = check_stockfish_version(sf_path, expected)
        print(f"  Using {version} at {sf_path}")
        _emit({"phase": "init", "message": f"Using {version}"})

    # Load Lichess token for Opening Explorer
    lichess_token = load_lichess_token(required=False)

    analysis_path = analysis_data_path()

    # Load existing analysis data
    existing_data = load_analysis_data(analysis_path)
    existing_games = existing_data.get("games", {})

    # --- Load games: from cache (game_ids) or from APIs (fetch) ---
    new_games: list[tuple[chess.pgn.Game, str, chess.Color]] = []

    if game_ids:
        # Load specific games from cache (no API fetch needed)
        from chess_self_coach.game_cache import get_cached_game, load_game_cache

        print(f"\n  Loading {len(game_ids)} game(s) from cache...")
        _emit({"phase": "fetch", "message": "Loading from cache...", "percent": 5})

        cache = load_game_cache()
        cached_games = cache.get("games", {})

        for gid in game_ids:
            if gid in existing_games and not reanalyze_all:
                print(f"  Skipped (already analyzed): {gid}")
                continue

            entry = cached_games.get(gid)
            if entry is None:
                print(f"  Warning: game not in cache, skipping: {gid}")
                continue

            game = get_cached_game(gid)
            if game is None:
                continue

            player_color_str = entry.get("player_color", "white")
            player_color = chess.WHITE if player_color_str == "white" else chess.BLACK
            new_games.append((game, gid, player_color))

        _emit(
            {
                "phase": "fetch",
                "message": f"{len(new_games)} game(s) to analyze",
                "percent": 10,
            }
        )
    else:
        # Original behavior: fetch from APIs
        print("\n  Fetching games...")
        _emit({"phase": "fetch", "message": "Fetching games...", "percent": 5})
        all_games: list[chess.pgn.Game] = []

        if lichess_user:
            all_games.extend(fetch_lichess_games(lichess_user, max_games))
        if chesscom_user:
            all_games.extend(fetch_chesscom_games(chesscom_user, max_games))

        if not all_games:
            print("  No games found.")
            _emit({"phase": "done", "message": "No games found.", "percent": 100})
            return

        # Filter games
        reanalyzed = 0
        skipped = 0
        for game in all_games:
            game_id = game.headers.get("Link", game.headers.get("Site", ""))
            if game_id == "?":
                game_id = ""

            white = game.headers.get("White", "?")
            black = game.headers.get("Black", "?")
            if white == "?" and black == "?":
                continue

            player_color = _determine_player_color(game, lichess_user, chesscom_user)
            if player_color is None:
                continue

            is_reanalysis = False
            if game_id and game_id in existing_games:
                if not reanalyze_all:
                    skipped += 1
                    continue
                is_reanalysis = True

            new_games.append((game, game_id, player_color))
            if is_reanalysis:
                reanalyzed += 1

        if skipped:
            print(f"  Skipped {skipped} already-analyzed game(s)")

        new_games.sort(
            key=lambda t: t[0].headers.get("Date", "0000.00.00"),
            reverse=True,
        )
        cap = max(0, max_games - len(existing_games)) + reanalyzed
        new_games = new_games[:cap]

        _emit(
            {
                "phase": "fetch",
                "message": f"Found {len(all_games)} game(s) ({len(new_games)} to analyze)",
                "percent": 10,
            }
        )

    if not new_games:
        print("  No new games to analyze.")
        _emit({"phase": "done", "message": "No new games.", "percent": 100})
        return

    # Open Stockfish (one instance, multi-threaded)
    threads = settings.resolved_threads
    hash_mb = settings.hash_mb
    print(
        f"\n  Analyzing {len(new_games)} game(s) with Stockfish ({threads} threads, {hash_mb}MB hash)..."
    )
    print("  This may take several minutes...\n")

    engine = chess.engine.SimpleEngine.popen_uci(str(sf_path))
    engine.configure({"Threads": threads, "Hash": hash_mb})

    # Syzygy endgame tablebases
    from chess_self_coach.syzygy import find_syzygy

    syzygy_path = find_syzygy(config)
    if not syzygy_path:
        engine.quit()
        raise RuntimeError(
            "Syzygy endgame tablebases (3-5 pieces) not found.\n"
            "  Install with: chess-self-coach syzygy download"
        )
    engine.configure({"SyzygyPath": str(syzygy_path)})
    _log.info("Syzygy tablebases: %s", syzygy_path)

    try:
        wall_start = _time.time()
        done_count = 0
        total_tasks = len(new_games)
        _emit({"phase": "analyze", "message": f"Analyzing 0/{total_tasks}", "percent": 15, "current": 0, "total": total_tasks})

        for game, game_id, player_color in new_games:
            done_count += 1
            white = game.headers.get("White", "?")
            black = game.headers.get("Black", "?")
            label = f"{white} vs {black}"

            # Re-analysis: pass existing move data to preserve API results
            prev_moves = None
            if reanalyze_all and game_id and game_id in existing_games:
                prev_moves = existing_games[game_id].get("moves")

            def _on_wait(attempt: int, delay: float) -> None:
                msg = f"API rate limit, retry #{attempt} in {delay:.0f}s ({label})"
                print(f"  ⏳ {msg}")
                _emit({"phase": "analyze", "message": msg, "waiting": True})

            start = _time.time()
            try:
                game_data = collect_game_data(
                    game,
                    engine,
                    player_color,
                    settings,
                    lichess_token,
                    game_id=game_id,
                    existing_moves=prev_moves,
                    on_wait=_on_wait,
                )
            except Exception as exc:
                _emit({"phase": "analyze", "message": str(exc), "error": True})
                print(f"  [{done_count}/{total_tasks}] Error analyzing {label}: {exc}")
                continue

            elapsed = _time.time() - start

            # Store analysis duration for ETA estimation
            game_data["analysis_duration_s"] = round(elapsed, 1)

            # Per-game summary
            _moves = game_data["moves"]
            _opening = [m for m in _moves if m.get("in_opening")]
            _other = [m for m in _moves if not m.get("in_opening")]
            _log.info(
                "Game %d/%d: %s%d moves in %.1fs",
                done_count,
                total_tasks,
                label,
                len(_moves),
                elapsed,
            )
            if _opening:
                _op_ms = sum(
                    m["timing_ms"]["eval_before"] + m["timing_ms"]["eval_after"]
                    for m in _opening
                )
                _log.info("  Opening: %d moves in %.1fs", len(_opening), _op_ms / 1000)
            if _other:
                _ot_ms = sum(
                    m["timing_ms"]["eval_before"] + m["timing_ms"]["eval_after"]
                    for m in _other
                )
                _src_counts: dict[str, int] = {}
                for m in _other:
                    s = m["eval_source"]
                    _src_counts[s] = _src_counts.get(s, 0) + 1
                _src_str = ", ".join(f"{k}: {v}" for k, v in _src_counts.items())
                _log.info(
                    "  Non-opening: %d moves (%s) in %.1fs",
                    len(_other),
                    _src_str,
                    _ot_ms / 1000,
                )

            # Store in analysis data
            store_id = game_id or f"unknown_{done_count}"
            existing_data.setdefault("games", {})[store_id] = game_data
            existing_data["player"] = {
                "lichess": lichess_user,
                "chesscom": chesscom_user or "",
            }

            # Atomic write after each game (crash-safe)
            save_analysis_data(existing_data, analysis_path)

            # Run downstream phases so the game is usable in the UI
            if on_game_done:
                on_game_done(store_id, game_data)

            # Progress
            move_count = len(game_data["moves"])
            wall_elapsed = _time.time() - wall_start
            avg_per_game = wall_elapsed / done_count
            remaining = avg_per_game * (total_tasks - done_count)
            eta_min, eta_sec = divmod(int(remaining), 60)
            eta_str = f"{eta_min}m{eta_sec:02d}s" if eta_min else f"{eta_sec}s"

            print(
                f"  [{done_count}/{total_tasks}] {label}... "
                f"{move_count} moves ({elapsed:.1f}s) — ETA {eta_str}"
            )
            pct = 15 + int(75 * done_count / total_tasks)
            _emit(
                {
                    "phase": "analyze",
                    "message": f"Analyzing {done_count}/{total_tasks}: {label}",
                    "percent": pct,
                    "current": done_count,
                    "total": total_tasks,
                }
            )

            # Check cancel
            if cancel and cancel.is_set():
                raise AnalysisInterrupted(
                    f"Interrupted. Saved {done_count}/{total_tasks} games."
                )
    finally:
        engine.quit()

    total_games = len(existing_data.get("games", {}))
    print(f"\n  Analysis data saved: {analysis_path}")
    print(f"  Total games analyzed: {total_games}")
    _emit(
        {
            "phase": "done",
            "message": f"Analysis complete. {total_games} games.",
            "percent": 100,
        }
    )

collect_game_data(game, engine, player_color, settings, lichess_token=None, game_id='', existing_moves=None, on_wait=None)

Collect full per-move analysis data for one game (Phase 1).

Source hierarchy per move
  1. Tablebase (priority if ≤7 pieces — perfect information)
  2. Masters opening explorer (in_opening=True, cp_loss=0)
  3. Cloud eval (depth 50-70, cp_loss computed)
  4. Stockfish (fallback, always re-run on re-analysis)

When existing_moves is provided (re-analysis), API data (masters, cloud eval, tablebase) is preserved and only breakpoints are re-tested. Stockfish positions are always re-run regardless.

Parameters:

Name Type Description Default
game Game

Parsed PGN game.

required
engine SimpleEngine

Running Stockfish engine (already configured with threads/hash).

required
player_color Color

Which color the player was.

required
settings AnalysisSettings

Analysis settings (for limits and storage).

required
lichess_token str | None

Lichess API token for Opening Explorer. None to skip.

None
game_id str

Unique game identifier. Passed to engine.analyse() so python-chess sends ucinewgame between different games (hash table reset).

''
existing_moves list[dict[str, Any]] | None

Previous per-move data from a prior analysis. When set, preserves API data and only re-tests breakpoints + re-runs Stockfish.

None
on_wait Callable[[int, float], None] | None

Optional callback(attempt, delay_seconds) called when an API request is retrying after a transient error (429, 5xx).

None

Returns:

Type Description
dict[str, Any]

Dict with game headers, settings, and moves[] array ready for

dict[str, Any]

storage in analysis_data.json.

Source code in src/chess_self_coach/analysis.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
def collect_game_data(
    game: chess.pgn.Game,
    engine: chess.engine.SimpleEngine,
    player_color: chess.Color,
    settings: AnalysisSettings,
    lichess_token: str | None = None,
    game_id: str = "",
    existing_moves: list[dict[str, Any]] | None = None,
    on_wait: Callable[[int, float], None] | None = None,
) -> dict[str, Any]:
    """Collect full per-move analysis data for one game (Phase 1).

    Source hierarchy per move:
      1. Tablebase (priority if ≤7 pieces — perfect information)
      2. Masters opening explorer (in_opening=True, cp_loss=0)
      3. Cloud eval (depth 50-70, cp_loss computed)
      4. Stockfish (fallback, always re-run on re-analysis)

    When *existing_moves* is provided (re-analysis), API data (masters,
    cloud eval, tablebase) is preserved and only breakpoints are re-tested.
    Stockfish positions are always re-run regardless.

    Args:
        game: Parsed PGN game.
        engine: Running Stockfish engine (already configured with threads/hash).
        player_color: Which color the player was.
        settings: Analysis settings (for limits and storage).
        lichess_token: Lichess API token for Opening Explorer. None to skip.
        game_id: Unique game identifier. Passed to engine.analyse() so python-chess
            sends ucinewgame between different games (hash table reset).
        existing_moves: Previous per-move data from a prior analysis. When set,
            preserves API data and only re-tests breakpoints + re-runs Stockfish.
        on_wait: Optional callback(attempt, delay_seconds) called when an API
            request is retrying after a transient error (429, 5xx).

    Returns:
        Dict with game headers, settings, and moves[] array ready for
        storage in analysis_data.json.
    """
    limits = settings.limits
    moves_data: list[dict] = []

    # Collect all (fen, move_uci) pairs for opening explorer batch query
    fens_and_moves: list[tuple[str, str]] = []
    node = game
    while node.variations:
        board = node.board()
        next_node = node.variations[0]
        fens_and_moves.append((board.fen(), next_node.move.uci()))
        node = next_node

    # Query Masters Opening Explorer (stops at departure)
    explorer_results: list[dict | None] = [None] * len(fens_and_moves)
    if lichess_token:
        from chess_self_coach.opening_explorer import query_opening_sequence

        existing_explorer = (
            [m.get("opening_explorer") for m in existing_moves]
            if existing_moves
            else None
        )
        explorer_results = query_opening_sequence(
            fens_and_moves, lichess_token, existing_results=existing_explorer
        )

    # Walk through the game and collect eval data for each move
    node = game
    ply = 0
    # Cache: eval_before for current position (reused as eval_after of previous move)
    cached_eval: dict[str, Any] | None = None
    cached_tb: dict[str, Any] | None = None
    cached_mpv: dict[str, Any] | None = None
    cloud_departed = False
    prev_player_clock: float | None = None
    prev_opponent_clock: float | None = None

    while node.variations:
        board = node.board()
        next_node = node.variations[0]
        actual_move = next_node.move
        piece_count = len(board.piece_map())
        side = "white" if board.turn == chess.WHITE else "black"

        # --- Board enrichments ---
        board_after = board.copy()
        board_after.push(actual_move)
        is_check = board_after.is_check()
        is_capture = board.is_capture(actual_move)
        is_castling = board.is_castling(actual_move)
        is_en_passant = board.is_en_passant(actual_move)
        is_promotion = actual_move.promotion is not None
        promoted_to = None
        if is_promotion and actual_move.promotion is not None:
            promoted_to = chess.piece_symbol(actual_move.promotion)

        # --- Clock data ---
        player_clock = next_node.clock()
        opponent_clock = None
        if next_node.variations:
            opponent_clock = next_node.variations[0].clock()

        # Compute time spent (difference from previous clock reading for the same side)
        time_spent = None
        if side == ("white" if player_color == chess.WHITE else "black"):
            # Player's move
            if player_clock is not None and prev_player_clock is not None:
                time_spent = prev_player_clock - player_clock
        else:
            # Opponent's move
            if opponent_clock is not None and prev_opponent_clock is not None:
                time_spent = prev_opponent_clock - opponent_clock

        # --- Opening Explorer: determine if move is in Masters theory ---
        explorer_data = explorer_results[ply] if ply < len(explorer_results) else None
        in_opening = (
            explorer_data is not None
            and explorer_data.get("_source") == "masters"
        )

        # --- Existing move data (re-analysis) ---
        existing: dict[str, Any] | None = (
            existing_moves[ply]
            if existing_moves is not None and ply < len(existing_moves)
            else None
        )

        # --- board_after_fen needed by all tiers ---
        board_after_fen = board_after.fen()

        # --- Tier dispatch: tablebase → masters+cloud → cloud → stockfish ---
        tb_before: dict[str, Any] | None = None
        tb_after: dict[str, Any] | None = None
        mpv_before: dict[str, Any] | None = None
        pc_after = len(board_after.piece_map())

        if piece_count <= MAX_PIECES:
            # ── Tier 1: Tablebase (priority, ≤7 pieces, perfect information) ──
            # Probe tablebase for current position
            _tb_probed: dict[str, Any] | None = None
            if existing and existing.get("tablebase_before"):
                _tb_probed = existing["tablebase_before"]
            else:
                _tb_probed = probe_position_full(board.fen(), on_wait=on_wait)

            t0 = _time.time()
            if cached_eval is not None and cached_tb is not None:
                score_before = cached_eval
                tb_before = cached_tb
                mpv_before = cached_mpv
                _eb_src = "cache"
            elif _tb_probed is not None:
                tb_before = _tb_probed
                score_before = _tb_to_eval(_tb_probed, board.turn)
                _eb_src = "tablebase"
            else:
                infos = engine.analyse(
                    board, _analysis_limit_from_settings(board, limits),
                    multipv=MULTIPV, game=game_id,
                )
                score_before = _extract_eval(infos[0], board)
                mpv_before = _extract_multipv(infos, board)
                _eb_src = "sf_fallback"
            score_before_ms = (_time.time() - t0) * 1000

            # Probe tablebase for position after move
            _tb_probed_after: dict[str, Any] | None = None
            if existing and existing.get("tablebase_after"):
                _tb_probed_after = existing["tablebase_after"]
            elif pc_after <= MAX_PIECES:
                _tb_probed_after = probe_position_full(board_after_fen, on_wait=on_wait)

            t0 = _time.time()
            if _tb_probed_after is not None:
                tb_after = _tb_probed_after
                score_after = _tb_to_eval(_tb_probed_after, board_after.turn)
                cached_eval = score_after
                cached_tb = _tb_probed_after
                cached_mpv = None
                _ea_src = "tablebase"
            else:
                infos_after = engine.analyse(
                    board_after,
                    _analysis_limit_from_settings(board_after, limits),
                    multipv=MULTIPV, game=game_id,
                )
                score_after = _extract_eval(infos_after[0], board_after)
                cached_eval = score_after
                cached_tb = None
                cached_mpv = _extract_multipv(infos_after, board_after)
                _ea_src = "stockfish"
            score_after_ms = (_time.time() - t0) * 1000

            tier_source = "tablebase" if tb_before is not None and tb_after is not None else (
                "stockfish+tablebase" if tb_before is not None or tb_after is not None else "stockfish"
            )

        else:
            # ── Tiers 2-4: Cloud scoring / Stockfish (>7 pieces) ──
            tier_source = "stockfish"  # default, overridden below

            # Can we reuse preserved cloud scoring data?
            _preserved_cloud = (
                existing is not None
                and existing.get("eval_source") == "cloud_eval"
            )

            if in_opening and _preserved_cloud:
                # ── Tier 2a: Masters move with preserved cloud scoring ──
                assert existing is not None  # guaranteed by _preserved_cloud
                t0 = _time.time()
                if cached_eval is not None:
                    score_before = cached_eval
                    mpv_before = cached_mpv
                    _eb_src = "cache"
                else:
                    score_before = existing["eval_before"]
                    mpv_before = existing.get("multipv_before")
                    _eb_src = "preserved"
                score_before_ms = (_time.time() - t0) * 1000

                t0 = _time.time()
                score_after = existing["eval_after"]
                _ea_src = "preserved"
                score_after_ms = (_time.time() - t0) * 1000

                tier_source = "cloud_eval"
                cached_eval = score_after
                cached_tb = None
                cached_mpv = None

            elif in_opening:
                # ── Tier 2b: Masters move, fresh cloud scoring query ──
                t0 = _time.time()
                if cached_eval is not None:
                    score_before = cached_eval
                    mpv_before = cached_mpv
                    _eb_src = "cache"
                else:
                    _lbl = f"[ply {ply+1} before] "
                    cloud = query_cloud_eval(
                        board.fen(), on_wait=on_wait, log_label=_lbl)
                    if cloud:
                        score_before = _cloud_eval_to_eval(cloud, board)
                        _eb_src = "cloud_eval"
                    else:
                        infos = engine.analyse(
                            board,
                            _analysis_limit_from_settings(board, limits),
                            multipv=MULTIPV, game=game_id,
                        )
                        score_before = _extract_eval(infos[0], board)
                        mpv_before = _extract_multipv(infos, board)
                        _eb_src = "sf_fallback"
                score_before_ms = (_time.time() - t0) * 1000

                t0 = _time.time()
                _lbl = f"[ply {ply+1} after] "
                cloud_after = query_cloud_eval(
                    board_after_fen, on_wait=on_wait, log_label=_lbl)
                if cloud_after:
                    score_after = _cloud_eval_to_eval(cloud_after, board_after)
                    _ea_src = "cloud_eval"
                    cached_mpv = None
                else:
                    infos_after = engine.analyse(
                        board_after,
                        _analysis_limit_from_settings(board_after, limits),
                        multipv=MULTIPV, game=game_id,
                    )
                    score_after = _extract_eval(infos_after[0], board_after)
                    cached_mpv = _extract_multipv(infos_after, board_after)
                    _ea_src = "sf_fallback"
                score_after_ms = (_time.time() - t0) * 1000

                tier_source = "cloud_eval" if _ea_src == "cloud_eval" else "stockfish"
                cached_eval = score_after
                cached_tb = None

            elif not cloud_departed and _preserved_cloud:
                # ── Tier 3a: Post-masters, preserved cloud scoring ──
                assert existing is not None  # guaranteed by _preserved_cloud
                t0 = _time.time()
                if cached_eval is not None:
                    score_before = cached_eval
                    mpv_before = cached_mpv
                    _eb_src = "cache"
                else:
                    score_before = existing["eval_before"]
                    mpv_before = existing.get("multipv_before")
                    _eb_src = "preserved"
                score_before_ms = (_time.time() - t0) * 1000

                t0 = _time.time()
                score_after = existing["eval_after"]
                _ea_src = "preserved"
                score_after_ms = (_time.time() - t0) * 1000

                tier_source = "cloud_eval"
                cached_eval = score_after
                cached_tb = None
                cached_mpv = None

            elif not cloud_departed:
                # ── Tier 3b: Post-masters, fresh cloud scoring query ──
                t0 = _time.time()
                if cached_eval is not None:
                    score_before = cached_eval
                    mpv_before = cached_mpv
                    _eb_src = "cache"
                else:
                    _lbl = f"[ply {ply+1} before] "
                    cloud = query_cloud_eval(
                        board.fen(), on_wait=on_wait, log_label=_lbl)
                    if cloud:
                        score_before = _cloud_eval_to_eval(cloud, board)
                        _eb_src = "cloud_eval"
                    else:
                        infos = engine.analyse(
                            board,
                            _analysis_limit_from_settings(board, limits),
                            multipv=MULTIPV, game=game_id,
                        )
                        score_before = _extract_eval(infos[0], board)
                        mpv_before = _extract_multipv(infos, board)
                        _eb_src = "sf_fallback"
                score_before_ms = (_time.time() - t0) * 1000

                t0 = _time.time()
                _lbl = f"[ply {ply+1} after] "
                cloud_after = query_cloud_eval(
                    board_after_fen, on_wait=on_wait, log_label=_lbl)
                if cloud_after:
                    score_after = _cloud_eval_to_eval(cloud_after, board_after)
                    _ea_src = "cloud_eval"
                    tier_source = "cloud_eval"
                    cached_mpv = None
                else:
                    cloud_departed = True
                    infos_after = engine.analyse(
                        board_after,
                        _analysis_limit_from_settings(board_after, limits),
                        multipv=MULTIPV, game=game_id,
                    )
                    score_after = _extract_eval(infos_after[0], board_after)
                    cached_mpv = _extract_multipv(infos_after, board_after)
                    _ea_src = "stockfish"
                score_after_ms = (_time.time() - t0) * 1000

                cached_eval = score_after
                cached_tb = None

            else:
                # ── Tier 4: Stockfish (fallback, always re-run) ──
                t0 = _time.time()
                if cached_eval is not None:
                    score_before = cached_eval
                    mpv_before = cached_mpv
                    _eb_src = "cache"
                else:
                    infos = engine.analyse(
                        board,
                        _analysis_limit_from_settings(board, limits),
                        multipv=MULTIPV, game=game_id,
                    )
                    score_before = _extract_eval(infos[0], board)
                    mpv_before = _extract_multipv(infos, board)
                    _eb_src = "stockfish"
                score_before_ms = (_time.time() - t0) * 1000

                t0 = _time.time()
                infos_after = engine.analyse(
                    board_after,
                    _analysis_limit_from_settings(board_after, limits),
                    multipv=MULTIPV, game=game_id,
                )
                score_after = _extract_eval(infos_after[0], board_after)
                _ea_src = "stockfish"
                score_after_ms = (_time.time() - t0) * 1000

                cached_eval = score_after
                cached_tb = None
                cached_mpv = _extract_multipv(infos_after, board_after)

        # --- Unified variable names for the rest of the loop ---
        eval_before = score_before
        eval_after = score_after
        eval_before_ms = score_before_ms
        eval_after_ms = score_after_ms
        eval_source = tier_source
        tb_before_stored = tb_before
        tb_after_stored = tb_after

        # --- cp_loss: 0 for opening (masters), computed for everything else ---
        cp_loss = 0
        if not in_opening:
            before_cp = eval_before.get("score_cp")
            after_cp = eval_after.get("score_cp")
            if before_cp is not None and after_cp is not None:
                best_uci = eval_before.get("best_move_uci", "")
                if best_uci and actual_move == chess.Move.from_uci(best_uci):
                    cp_loss = 0
                elif board.turn == chess.WHITE:
                    cp_loss = max(0, before_cp - after_cp)
                else:
                    cp_loss = max(0, after_cp - before_cp)

        _source_tag = (
            "[book] " if in_opening
            else "[cloud] " if eval_source == "cloud_eval"
            else "[tb] " if eval_source == "tablebase"
            else ""
        )
        _log.info(
            "  ply %d %s%s: %s — before=%s(%.0fms cp=%s) after=%s(%.0fms cp=%s) cp_loss=%d",
            ply + 1,
            _source_tag,
            board.san(actual_move),
            eval_source,
            _eb_src,
            eval_before_ms,
            eval_before.get("score_cp"),
            _ea_src,
            eval_after_ms,
            eval_after.get("score_cp"),
            cp_loss,
        )

        # --- Build move dict ---
        move_dict = {
            "ply": ply + 1,
            "fen_before": board.fen(),
            "fen_after": board_after_fen,
            "move_san": board.san(actual_move),
            "move_uci": actual_move.uci(),
            "side": side,
            "eval_source": eval_source,
            "in_opening": in_opening,
            "eval_before": eval_before,
            "eval_after": eval_after,
            "multipv_before": mpv_before,
            "tablebase_before": tb_before_stored,
            "tablebase_after": tb_after_stored,
            "opening_explorer": explorer_data,
            "cp_loss": cp_loss,
            "board": {
                "piece_count": piece_count,
                "is_check": is_check,
                "is_capture": is_capture,
                "is_castling": is_castling,
                "is_en_passant": is_en_passant,
                "is_promotion": is_promotion,
                "promoted_to": promoted_to,
                "legal_moves_count": len(list(board.legal_moves)),
            },
            "clock": {
                "player": player_clock,
                "opponent": opponent_clock,
                "time_spent": round(time_spent, 1) if time_spent is not None else None,
            },
            "timing_ms": {
                "eval_before": round(eval_before_ms, 1),
                "eval_after": round(eval_after_ms, 1),
            },
        }
        moves_data.append(move_dict)

        # Update state for next iteration
        if side == ("white" if player_color == chess.WHITE else "black"):
            prev_player_clock = player_clock
        else:
            prev_opponent_clock = opponent_clock

        ply += 1
        node = next_node

    # --- Build game-level dict ---
    p_color = "white" if player_color == chess.WHITE else "black"
    game_id = game.headers.get("Link", game.headers.get("Site", ""))
    source = (
        "lichess"
        if "lichess.org" in game_id
        else ("chess.com" if "chess.com" in game_id else "unknown")
    )

    return {
        "headers": {
            "white": game.headers.get("White", "?"),
            "black": game.headers.get("Black", "?"),
            "date": game.headers.get("Date", "?"),
            "result": game.headers.get("Result", "*"),
            "opening": game.headers.get("Opening", game.headers.get("Event", "?")),
            "source": source,
            "link": game_id,
        },
        "player_color": p_color,
        "analyzed_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
        "settings": settings.to_dict(),
        "moves": moves_data,
    }

load_analysis_data(path=None)

Load analysis_data.json, returning empty structure if not found.

Parameters:

Name Type Description Default
path Path | None

Path to analysis_data.json. Defaults to data directory.

None

Returns:

Type Description
dict

Parsed dict with at least {version, player, games}.

Source code in src/chess_self_coach/analysis.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def load_analysis_data(path: Path | None = None) -> dict:
    """Load analysis_data.json, returning empty structure if not found.

    Args:
        path: Path to analysis_data.json. Defaults to data directory.

    Returns:
        Parsed dict with at least {version, player, games}.
    """
    if path is None:
        path = _default_analysis_path()
    if not path.exists():
        return {"version": "1.0", "player": {}, "games": {}}
    try:
        with open(path) as f:
            return json.load(f)
    except (json.JSONDecodeError, KeyError):
        _log.warning("Corrupted analysis_data.json, returning empty structure")
        return {"version": "1.0", "player": {}, "games": {}}

save_analysis_data(data, path=None)

Atomically write analysis_data.json.

Parameters:

Name Type Description Default
data dict

Full analysis data dict.

required
path Path | None

Target path. Defaults to data directory.

None
Source code in src/chess_self_coach/analysis.py
118
119
120
121
122
123
124
125
126
127
128
def save_analysis_data(data: dict, path: Path | None = None) -> None:
    """Atomically write analysis_data.json.

    Args:
        data: Full analysis data dict.
        path: Target path. Defaults to data directory.
    """
    if path is None:
        path = _default_analysis_path()
    data["version"] = "1.0"
    atomic_write_json(path, data)

settings_match(stored, current)

Check if stored analysis settings match current settings.

Used to skip re-analysis of games already analyzed with identical settings.

Parameters:

Name Type Description Default
stored dict

Settings dict from a previously analyzed game.

required
current dict

Current settings dict.

required

Returns:

Type Description
bool

True if settings are equivalent.

Source code in src/chess_self_coach/analysis.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def settings_match(stored: dict, current: dict) -> bool:
    """Check if stored analysis settings match current settings.

    Used to skip re-analysis of games already analyzed with identical settings.

    Args:
        stored: Settings dict from a previously analyzed game.
        current: Current settings dict.

    Returns:
        True if settings are equivalent.
    """
    return (
        stored.get("threads") == current.get("threads")
        and stored.get("hash_mb") == current.get("hash_mb")
        and stored.get("limits") == current.get("limits")
    )

Training mode: explanation generation, move classification, and training data utilities.

Pure functions for generating rule-based explanations, classifying mistakes by centipawn loss, and managing training_data.json (stats, refresh). The heavy analysis pipeline lives in analysis.py (Phase 1: collection, Phase 2: derivation).

classify_mistake(cp_loss)

Classify a move by centipawn loss.

Returns:

Type Description
str | None

Category string or None if the move is acceptable.

Source code in src/chess_self_coach/trainer.py
42
43
44
45
46
47
48
49
50
51
52
53
54
def classify_mistake(cp_loss: int) -> str | None:
    """Classify a move by centipawn loss.

    Returns:
        Category string or None if the move is acceptable.
    """
    if cp_loss >= BLUNDER_THRESHOLD:
        return "blunder"
    if cp_loss >= MISTAKE_THRESHOLD:
        return "mistake"
    if cp_loss >= INACCURACY_THRESHOLD:
        return "inaccuracy"
    return None

format_score_cp(cp)

Format centipawn value as score string like '+0.32'.

Source code in src/chess_self_coach/trainer.py
32
33
34
35
36
37
38
def format_score_cp(cp: int | None) -> str:
    """Format centipawn value as score string like '+0.32'."""
    if cp is None:
        return "+0.00"
    value = cp / 100.0
    sign = "+" if value >= 0 else ""
    return f"{sign}{value:.2f}"

generate_context(category, cp_loss, was_mate, score_after_cp, fen='', score_before_cp=None, player_color='white')

Generate a short context sentence shown BEFORE the player answers.

Includes game phase, advantage context, and what went wrong.

Source code in src/chess_self_coach/trainer.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def generate_context(
    category: str,
    cp_loss: int,
    was_mate: bool,
    score_after_cp: int | None,
    fen: str = "",
    score_before_cp: int | None = None,
    player_color: str = "white",
) -> str:
    """Generate a short context sentence shown BEFORE the player answers.

    Includes game phase, advantage context, and what went wrong.
    """
    score_after_is_mate = score_after_cp is not None and abs(score_after_cp) >= MATE_CP

    phase = _detect_game_phase(fen) if fen else ""
    color_label = f"playing as {player_color.capitalize()}"
    advantage = _describe_advantage(score_before_cp, player_color) if score_before_cp is not None else ""
    if phase and advantage:
        prefix = f"{phase}, {color_label}, {advantage}."
    elif phase:
        prefix = f"{phase}, {color_label}."
    else:
        prefix = f"{color_label.capitalize()}."

    if was_mate and score_after_cp is not None and abs(score_after_cp) < 50:
        return f"{prefix} Your move threw away a winning position and led to a draw."
    if was_mate:
        return f"{prefix} Your move threw away a forced mate."
    if score_after_is_mate:
        return f"{prefix} Your move allowed your opponent to force checkmate."
    if cp_loss >= MATE_CP:
        return f"{prefix} Your move allowed your opponent to force checkmate."

    pawns = cp_loss / 100.0
    if pawns >= 5:
        return f"{prefix} Your move lost a decisive advantage."
    if pawns >= 2:
        return f"{prefix} Your move lost significant material ({pawns:.1f} pawns)."
    if pawns >= 1:
        return f"{prefix} Your move cost about {pawns:.1f} pawns."
    return f"{prefix} Your move was inaccurate ({pawns:.1f} pawns)."

generate_explanation(board, actual_san, best_san, cp_loss, category, was_mate=False, score_after_cp=None)

Generate a rule-based explanation for a mistake.

Detects basic patterns: missed captures, missed checks/checkmates, hanging pieces, stalemate. Falls back to a generic template.

Parameters:

Name Type Description Default
board Board

Board position BEFORE the move was played.

required
actual_san str

The move the player made (SAN).

required
best_san str

The best move according to Stockfish (SAN).

required
cp_loss int

Centipawn loss.

required
category str

Mistake category string.

required
was_mate bool

True if the position before was a forced mate.

False
score_after_cp int | None

Score after the move (white perspective), for context.

None

Returns:

Type Description
str

Explanation string.

Source code in src/chess_self_coach/trainer.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def generate_explanation(
    board: chess.Board,
    actual_san: str,
    best_san: str,
    cp_loss: int,
    category: str,
    was_mate: bool = False,
    score_after_cp: int | None = None,
) -> str:
    """Generate a rule-based explanation for a mistake.

    Detects basic patterns: missed captures, missed checks/checkmates,
    hanging pieces, stalemate. Falls back to a generic template.

    Args:
        board: Board position BEFORE the move was played.
        actual_san: The move the player made (SAN).
        best_san: The best move according to Stockfish (SAN).
        cp_loss: Centipawn loss.
        category: Mistake category string.
        was_mate: True if the position before was a forced mate.
        score_after_cp: Score after the move (white perspective), for context.

    Returns:
        Explanation string.
    """
    score_after_is_mate = score_after_cp is not None and abs(score_after_cp) >= MATE_CP

    # Build opening sentence with appropriate phrasing
    if was_mate and score_after_cp is not None and abs(score_after_cp) < 50:
        parts = [f"You played {actual_san} ({category}). You had a forced mate but threw it away — the game is now a draw."]
    elif was_mate:
        parts = [f"You played {actual_san} ({category}). You had a forced mate but lost it."]
    elif score_after_is_mate:
        parts = [f"You played {actual_san} ({category}). This allowed your opponent to force checkmate."]
    else:
        loss_str = _format_cp_loss_human(cp_loss)
        parts = [f"You played {actual_san} ({category}, lost {loss_str})."]

    # Analyze the actual move for immediate stalemate detection
    board_after_actual = None
    try:
        actual_move = board.parse_san(actual_san)
        board_after_actual = board.copy()
        board_after_actual.push(actual_move)
        if board_after_actual.is_stalemate():
            parts.append("This leads to stalemate (draw)!")
    except ValueError:
        pass

    try:
        best_move = board.parse_san(best_san)
    except ValueError:
        parts.append(f"A better move was {best_san}.")
        return " ".join(parts)

    # Check if best move delivers checkmate
    board_after_best = board.copy()
    board_after_best.push(best_move)
    if board_after_best.is_checkmate():
        parts.append(f"{best_san} was checkmate!")
        return " ".join(parts)

    # Check if best move captures a piece
    if board.is_capture(best_move):
        captured_piece = board.piece_at(best_move.to_square)
        if captured_piece is None:
            parts.append(f"{best_san} wins a pawn (en passant).")
        else:
            piece_name = chess.piece_name(captured_piece.piece_type)
            parts.append(f"You missed capturing the {piece_name} with {best_san}.")
    else:
        parts.append(f"A better move was {best_san}.")

    # Check if best move gives check
    if board_after_best.is_check():
        parts.append(f"{best_san} also gives check.")

    # Check if the actual move hangs a piece
    if board_after_actual:
        moving_piece = board.piece_at(actual_move.from_square)
        if moving_piece:
            attacked = board_after_actual.is_attacked_by(
                not board.turn, actual_move.to_square
            )
            defended = board_after_actual.is_attacked_by(
                board.turn, actual_move.to_square
            )
            if attacked and not defended:
                piece_name = chess.piece_name(moving_piece.piece_type)
                sq_name = chess.square_name(actual_move.to_square)
                parts.append(f"Your {piece_name} on {sq_name} is left undefended.")

    return " ".join(parts)

get_stats_data()

Compute training statistics from training_data.json.

Returns:

Type Description
dict

Dict with keys: generated, total, by_category, by_source.

Raises:

Type Description
FileNotFoundError

If training_data.json does not exist.

Source code in src/chess_self_coach/trainer.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def get_stats_data() -> dict:
    """Compute training statistics from training_data.json.

    Returns:
        Dict with keys: generated, total, by_category, by_source.

    Raises:
        FileNotFoundError: If training_data.json does not exist.
    """
    data_path = training_data_path()
    if not data_path.exists():
        raise FileNotFoundError(f"No training data at {data_path}")

    with open(data_path) as f:
        data = json.load(f)

    positions = data.get("positions", [])

    categories: dict[str, int] = {}
    for p in positions:
        cat = p.get("category", "unknown")
        categories[cat] = categories.get(cat, 0) + 1

    sources: dict[str, int] = {}
    for p in positions:
        src = p.get("game", {}).get("source", "unknown")
        sources[src] = sources.get(src, 0) + 1

    return {
        "generated": data.get("generated", "unknown"),
        "total": len(positions),
        "by_category": categories,
        "by_source": sources,
    }

print_stats()

Show training progress from training_data.json.

Source code in src/chess_self_coach/trainer.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def print_stats() -> None:
    """Show training progress from training_data.json."""
    try:
        stats = get_stats_data()
    except FileNotFoundError:
        print(
            "No training data found. Run: chess-self-coach train --prepare",
            file=sys.stderr,
        )
        sys.exit(1)

    if stats["total"] == 0:
        print("  No positions in training data.")
        return

    print("\n  Training Data Stats")
    print(f"  Generated: {stats['generated']}")
    print(f"  Total positions: {stats['total']}")

    print("\n  By category:")
    for cat in ["blunder", "mistake", "inaccuracy"]:
        print(f"    {cat.capitalize()}: {stats['by_category'].get(cat, 0)}")

    print("\n  By source:")
    for src, count in sorted(stats["by_source"].items()):
        print(f"    {src}: {count}")

refresh_explanations()

Regenerate explanations in training_data.json without re-running Stockfish.

Reads existing positions, rebuilds explanations using generate_explanation(), and writes back. SRS progress and all other fields are preserved.

Source code in src/chess_self_coach/trainer.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def refresh_explanations() -> None:
    """Regenerate explanations in training_data.json without re-running Stockfish.

    Reads existing positions, rebuilds explanations using generate_explanation(),
    and writes back. SRS progress and all other fields are preserved.
    """
    data_path = training_data_path()

    if not data_path.exists():
        print("No training data found. Run: chess-self-coach train --prepare", file=sys.stderr)
        sys.exit(1)

    with open(data_path) as f:
        data = json.load(f)

    positions = data.get("positions", [])

    # Remove invalid positions (player_move == best_move)
    before_count = len(positions)
    positions = [p for p in positions if p["player_move"] != p["best_move"]]
    removed = before_count - len(positions)
    if removed:
        data["positions"] = positions
        print(f"  Removed {removed} invalid position(s) (player_move == best_move)")

    # Remove positions where both moves win or both lose (no learning value)
    def _parse_score_cp(s: str) -> int | None:
        try:
            return int(float(s) * 100)
        except (ValueError, TypeError):
            return None

    before_count = len(positions)
    filtered = []
    for p in positions:
        sb = _parse_score_cp(p.get("score_before", ""))
        sa = _parse_score_cp(p.get("score_after", ""))
        if sb is None or sa is None:
            filtered.append(p)
            continue
        mul = 1 if p.get("player_color") == "white" else -1
        player_before = sb * mul
        player_after = sa * mul
        if player_before > DOMINATED_POSITION_CP and player_after > DOMINATED_POSITION_CP:
            continue
        if player_before < -DOMINATED_POSITION_CP and player_after < -DOMINATED_POSITION_CP:
            continue
        filtered.append(p)
    positions = filtered
    removed_decisive = before_count - len(positions)
    if removed_decisive:
        data["positions"] = positions
        print(f"  Removed {removed_decisive} position(s) already decisive (both win or both lose)")

    # Fix tablebase scores for Black: convert from side-to-move to player perspective
    _tb_flip = {"TB:win": "TB:loss", "TB:loss": "TB:win"}
    tb_fixed = 0
    for pos in positions:
        if "tablebase" not in pos or pos.get("player_color") != "black":
            continue
        for key in ("score_before", "score_after", "score_after_best"):
            val = pos.get(key)
            if val in _tb_flip:
                pos[key] = _tb_flip[val]
                tb_fixed += 1
    if tb_fixed:
        data["positions"] = positions
        print(f"  Fixed {tb_fixed} tablebase score(s) (side-to-move → player perspective)")

    updated = 0
    for pos in positions:
        board = chess.Board(pos["fen"])

        # Tablebase-resolved positions: regenerate from stored tablebase data
        tb_data = pos.get("tablebase")
        if tb_data:
            tb_before = tb_data.get("before")
            tb_after = tb_data.get("after")
            if tb_before:
                tb_res_before = TablebaseResult(
                    category=tb_before["category"],
                    dtz=tb_before.get("dtz"),
                    dtm=tb_before.get("dtm"),
                    best_move=None,
                )
                new_context = tablebase_context(
                    tb_res_before, len(board.piece_map()),
                    pos.get("player_color", "white"),
                )
                if tb_after:
                    tb_res_after = TablebaseResult(
                        category=tb_after["category"],
                        dtz=tb_after.get("dtz"),
                        dtm=tb_after.get("dtm"),
                        best_move=None,
                    )
                    new_explanation = tablebase_explanation(
                        tb_res_before, tb_res_after,
                        pos["player_move"], pos["best_move"],
                    )
                else:
                    new_explanation = pos.get("explanation", "")
            else:
                continue
        else:
            # Parse scores to cp
            score_before_str = pos.get("score_before", "+0.00")
            score_after_str = pos.get("score_after", "+0.00")
            try:
                score_before_cp = int(float(score_before_str) * 100)
            except (ValueError, TypeError):
                score_before_cp = None
            try:
                score_after_cp = int(float(score_after_str) * 100)
            except (ValueError, TypeError):
                score_after_cp = None

            was_mate = score_before_cp is not None and abs(score_before_cp) >= MATE_CP

            new_explanation = generate_explanation(
                board, pos["player_move"], pos["best_move"],
                pos["cp_loss"], pos["category"],
                was_mate=was_mate, score_after_cp=score_after_cp,
            )
            new_context = generate_context(
                pos["category"], pos["cp_loss"], was_mate, score_after_cp,
                fen=pos["fen"], score_before_cp=score_before_cp,
                player_color=pos.get("player_color", "white"),
            )
        # Fix source if "unknown" and game.id hints at the platform
        game = pos.get("game", {})
        game_id = game.get("id", "")
        if game.get("source") == "unknown":
            if "lichess.org" in game_id.lower():
                game["source"] = "lichess"
            elif "chess.com" in game_id.lower():
                game["source"] = "chess.com"

        if new_explanation != pos.get("explanation") or new_context != pos.get("context"):
            pos["explanation"] = new_explanation
            pos["context"] = new_context
            updated += 1

    atomic_write_json(data_path, data)

    print(f"  Refreshed {updated}/{len(positions)} explanation(s) in {data_path}")
    if updated:
        print("  Run /review-training to verify text quality")

time_pressure_context(player_clock, opponent_clock)

Generate time pressure context string, or empty if not relevant.

Parameters:

Name Type Description Default
player_clock float | None

Player's remaining time in seconds, or None.

required
opponent_clock float | None

Opponent's remaining time in seconds, or None.

required
Source code in src/chess_self_coach/trainer.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def time_pressure_context(
    player_clock: float | None, opponent_clock: float | None,
) -> str:
    """Generate time pressure context string, or empty if not relevant.

    Args:
        player_clock: Player's remaining time in seconds, or None.
        opponent_clock: Opponent's remaining time in seconds, or None.
    """
    if player_clock is None:
        return ""

    p_min = player_clock / 60

    if p_min < 2:
        if opponent_clock and opponent_clock / 60 > p_min * 2:
            o_min = opponent_clock / 60
            return (
                f"You were under severe time pressure "
                f"({p_min:.0f}min left vs {o_min:.0f}min for your opponent)."
            )
        return f"You were under time pressure ({p_min:.0f}min remaining)."

    if opponent_clock and player_clock > opponent_clock * 1.5:
        o_min = opponent_clock / 60
        return (
            f"You had more time ({p_min:.0f}min vs {o_min:.0f}min) "
            f"and could have taken longer on this move."
        )

    return ""

FastAPI backend server for Chess Self-Coach [App] mode.

Serves the PWA with API endpoints for native Stockfish analysis. Replaces the old static-file-only serve_pwa() from trainer.py.

Key design decisions: - No temp dir: PWA files served directly from source, sw.js and training_data.json via dynamic routes (always fresh, no copy needed). - Single Stockfish engine instance with asyncio.Lock for thread safety. - Engine crash recovery: auto-restart on EngineTerminatedError. - Port scanning: tries 8000-8010 if default port is busy.

AnalysisSettingsResponse

Bases: BaseModel

Response body for GET /api/analysis/settings.

Source code in src/chess_self_coach/server.py
223
224
225
226
227
228
class AnalysisSettingsResponse(BaseModel):
    """Response body for GET /api/analysis/settings."""

    threads: int
    hash_mb: int
    limits: dict[str, dict[str, float | int]]

AnalysisStartRequest

Bases: BaseModel

Request body for POST /api/analysis/start.

Source code in src/chess_self_coach/server.py
231
232
233
234
235
236
class AnalysisStartRequest(BaseModel):
    """Request body for POST /api/analysis/start."""

    game_ids: list[str] = Field(default_factory=list)
    max_games: int = 10
    reanalyze_all: bool = False

BestMoveRequest

Bases: BaseModel

Request body for /api/stockfish/bestmove.

Source code in src/chess_self_coach/server.py
166
167
168
169
170
class BestMoveRequest(BaseModel):
    """Request body for /api/stockfish/bestmove."""

    fen: str
    depth: int = Field(ge=1, le=30, default=18)

BestMoveResponse

Bases: BaseModel

Response body for /api/stockfish/bestmove.

Source code in src/chess_self_coach/server.py
173
174
175
176
class BestMoveResponse(BaseModel):
    """Response body for /api/stockfish/bestmove."""

    bestmove: str

ConfigResponse

Bases: BaseModel

Response body for GET /api/config.

Source code in src/chess_self_coach/server.py
187
188
189
190
191
class ConfigResponse(BaseModel):
    """Response body for GET /api/config."""

    players: dict[str, str]
    analysis: dict[str, float | int]

ConfigUpdateRequest

Bases: BaseModel

Request body for POST /api/config.

Source code in src/chess_self_coach/server.py
194
195
196
197
198
class ConfigUpdateRequest(BaseModel):
    """Request body for POST /api/config."""

    players: dict[str, str] | None = None
    analysis: dict[str, float | int] | None = None

GameListResponse

Bases: BaseModel

Response body for GET /api/games.

Source code in src/chess_self_coach/server.py
216
217
218
219
220
class GameListResponse(BaseModel):
    """Response body for GET /api/games."""

    games: list[GameSummaryResponse]
    fetched_at: str | None = None

GameSummaryResponse

Bases: BaseModel

One game in the game list.

Source code in src/chess_self_coach/server.py
201
202
203
204
205
206
207
208
209
210
211
212
213
class GameSummaryResponse(BaseModel):
    """One game in the game list."""

    game_id: str
    white: str
    black: str
    player_color: str
    result: str
    date: str
    opening: str
    move_count: int
    source: str
    analyzed: bool

JobStartResponse

Bases: BaseModel

Response body for job start endpoints.

Source code in src/chess_self_coach/server.py
239
240
241
242
class JobStartResponse(BaseModel):
    """Response body for job start endpoints."""

    job_id: str

StatusResponse

Bases: BaseModel

Response body for /api/status.

Source code in src/chess_self_coach/server.py
179
180
181
182
183
184
class StatusResponse(BaseModel):
    """Response body for /api/status."""

    mode: str = "app"
    version: str
    stockfish_version: str

analysis_data() async

Serve analysis data directly from project root (always fresh).

Source code in src/chess_self_coach/server.py
645
646
647
648
649
650
651
@app.get("/analysis_data.json")
async def analysis_data():
    """Serve analysis data directly from project root (always fresh)."""
    path = _project_root / DATA_DIR / ANALYSIS_DATA_FILE
    if not path.exists():
        raise HTTPException(status_code=404, detail="No analysis data. Run: chess-self-coach train --analyze")
    return FileResponse(path, media_type="application/json")

analysis_start(req) async

Start a background full game analysis job.

Source code in src/chess_self_coach/server.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
@app.post("/api/analysis/start", status_code=202)
async def analysis_start(req: AnalysisStartRequest) -> JobStartResponse:
    """Start a background full game analysis job."""
    global _current_job

    with _job_lock:
        if _current_job and _current_job["status"] == "running":
            raise HTTPException(status_code=409, detail="A job is already running")

        job_id = str(uuid.uuid4())[:8]
        _current_job = {
            "id": job_id,
            "status": "running",
            "queue": asyncio.Queue(),
            "cancel": threading.Event(),
            "params": {
                "game_ids": req.game_ids,
                "max_games": req.max_games,
                "reanalyze_all": req.reanalyze_all,
            },
        }

    loop = asyncio.get_event_loop()
    thread = threading.Thread(target=_run_analysis_job, args=(job_id, loop), daemon=True)
    thread.start()

    return JobStartResponse(job_id=job_id)

bestmove(req) async

Compute the best move for a position using native Stockfish.

Source code in src/chess_self_coach/server.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@app.post("/api/stockfish/bestmove")
async def bestmove(req: BestMoveRequest) -> BestMoveResponse:
    """Compute the best move for a position using native Stockfish."""
    global _engine

    if _engine is None:
        raise HTTPException(status_code=503, detail="Stockfish not available")

    try:
        board = chess.Board(req.fen)
    except ValueError as err:
        raise HTTPException(status_code=400, detail=f"Invalid FEN: {err}")

    limit = chess.engine.Limit(depth=req.depth)

    async with _engine_lock:
        try:
            result = await asyncio.to_thread(_engine.play, board, limit)
        except chess.engine.EngineTerminatedError:
            # Engine crashed — restart and retry
            _log.warning("Stockfish crashed, restarting...")
            if _sf_path:
                _engine = chess.engine.SimpleEngine.popen_uci(str(_sf_path))
                result = await asyncio.to_thread(_engine.play, board, limit)
            else:
                raise HTTPException(status_code=503, detail="Stockfish crashed and cannot restart")

    return BestMoveResponse(bestmove=str(result.move))

classifications_data() async

Serve pre-computed move classifications (always fresh).

Source code in src/chess_self_coach/server.py
654
655
656
657
658
659
660
@app.get("/classifications_data.json")
async def classifications_data():
    """Serve pre-computed move classifications (always fresh)."""
    path = _project_root / DATA_DIR / CLASSIFICATIONS_DATA_FILE
    if not path.exists():
        raise HTTPException(status_code=404, detail="No classifications data")
    return FileResponse(path, media_type="application/json")

games_fetch(max_games=200) async

Fetch games from Lichess/chess.com and cache locally.

Source code in src/chess_self_coach/server.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
@app.post("/api/games/fetch")
async def games_fetch(max_games: int = 200) -> GameListResponse:
    """Fetch games from Lichess/chess.com and cache locally."""
    from chess_self_coach.config import load_config
    from chess_self_coach.game_cache import fetch_and_cache_games, load_game_cache

    config = load_config()
    players = config.get("players", {})
    lichess_user = players.get("lichess", "")
    chesscom_user = players.get("chesscom")

    if not lichess_user and not chesscom_user:
        raise HTTPException(
            status_code=400,
            detail="No player configured. Run 'chess-self-coach setup'.",
        )

    summaries = await asyncio.to_thread(
        fetch_and_cache_games, lichess_user, chesscom_user, max_games
    )
    cache = load_game_cache()

    return GameListResponse(
        games=[GameSummaryResponse(**s.to_dict()) for s in summaries],
        fetched_at=cache.get("fetched_at"),
    )

games_list(limit=20) async

Return unified game list (cached + analyzed), sorted by date.

Source code in src/chess_self_coach/server.py
360
361
362
363
364
365
366
367
368
369
370
371
@app.get("/api/games")
async def games_list(limit: int = 20) -> GameListResponse:
    """Return unified game list (cached + analyzed), sorted by date."""
    from chess_self_coach.game_cache import get_unified_game_list, load_game_cache

    summaries = get_unified_game_list(limit=limit)
    cache = load_game_cache()

    return GameListResponse(
        games=[GameSummaryResponse(**s.to_dict()) for s in summaries],
        fetched_at=cache.get("fetched_at"),
    )

get_analysis_settings() async

Return current analysis engine settings (with 'auto' resolved).

Source code in src/chess_self_coach/server.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@app.get("/api/analysis/settings")
async def get_analysis_settings() -> AnalysisSettingsResponse:
    """Return current analysis engine settings (with 'auto' resolved)."""
    from chess_self_coach.analysis import AnalysisSettings
    from chess_self_coach.config import load_config

    config = load_config()
    settings = AnalysisSettings.from_config(config)
    d = settings.to_dict()
    return AnalysisSettingsResponse(
        threads=d["threads"],
        hash_mb=d["hash_mb"],
        limits=d["limits"],
    )

get_config() async

Return editable config fields (players, analysis).

Source code in src/chess_self_coach/server.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@app.get("/api/config")
async def get_config() -> ConfigResponse:
    """Return editable config fields (players, analysis)."""
    config_path = _project_root / DATA_DIR / CONFIG_FILE
    if not config_path.exists():
        raise HTTPException(status_code=404, detail="config.json not found")

    with open(config_path) as f:
        config = json.load(f)

    return ConfigResponse(
        players=config.get("players", {}),
        analysis=config.get("analysis", {}),
    )

job_cancel(job_id) async

Request cancellation of a running job.

Source code in src/chess_self_coach/server.py
622
623
624
625
626
627
628
629
630
@app.post("/api/jobs/{job_id}/cancel", status_code=202)
async def job_cancel(job_id: str):
    """Request cancellation of a running job."""
    if not _current_job or _current_job["id"] != job_id:
        raise HTTPException(status_code=404, detail="Job not found")
    if _current_job["status"] != "running":
        raise HTTPException(status_code=409, detail="Job is not running")
    _current_job["cancel"].set()
    return {"status": "cancelling"}

job_current() async

Return the current job ID and status, if any.

Source code in src/chess_self_coach/server.py
613
614
615
616
617
618
619
@app.get("/api/jobs/current")
async def job_current():
    """Return the current job ID and status, if any."""
    if not _current_job:
        return {"job_id": None, "status": None, "game_ids": []}
    params = _current_job.get("params", {})
    return {"job_id": _current_job["id"], "status": _current_job["status"], "game_ids": params.get("game_ids", [])}

job_events(job_id) async

Stream job progress events via SSE.

Source code in src/chess_self_coach/server.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
@app.get("/api/jobs/{job_id}/events")
async def job_events(job_id: str):
    """Stream job progress events via SSE."""
    if not _current_job or _current_job["id"] != job_id:
        raise HTTPException(status_code=404, detail="Job not found")

    queue = _current_job["queue"]

    async def event_generator():
        while True:
            event = await queue.get()
            if event is None:
                break
            yield {"data": json.dumps(event)}

    return EventSourceResponse(event_generator())

lifespan(app) async

Manage Stockfish engine lifecycle.

Source code in src/chess_self_coach/server.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage Stockfish engine lifecycle."""
    global _engine, _sf_path, _sf_version, _project_root, _pwa_dir

    _project_root = _find_project_root()
    _pwa_dir = _project_root / "pwa"

    try:
        _sf_path = find_stockfish()
        _engine = chess.engine.SimpleEngine.popen_uci(str(_sf_path))
        # Parse version from engine id
        _sf_version = _engine.id.get("name", "unknown")
        _log.info("Stockfish: %s", _sf_version)
    except SystemExit:
        _log.warning("Stockfish not found. /api/stockfish/* will be unavailable.")
        _engine = None

    yield

    if _engine:
        _engine.quit()
        _engine = None

run_server()

Start the FastAPI server and open the browser.

Called by cli.py when user runs chess-self-coach (no subcommand) or chess-self-coach train --serve.

Source code in src/chess_self_coach/server.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
def run_server() -> None:
    """Start the FastAPI server and open the browser.

    Called by cli.py when user runs `chess-self-coach` (no subcommand)
    or `chess-self-coach train --serve`.
    """
    import uvicorn

    port = _find_available_port()
    url = f"http://localhost:{port}"

    print(f"  Serving PWA at {url} (v{__version__})")
    print("  Press Ctrl+C to stop\n")

    threading.Timer(0.5, lambda: webbrowser.open(url)).start()

    uvicorn.run(
        app,
        host="localhost",
        port=port,
        log_level="warning",
    )

service_worker() async

Serve service worker with version injected on-the-fly.

Source code in src/chess_self_coach/server.py
663
664
665
666
667
668
669
670
671
672
@app.get("/sw.js")
async def service_worker():
    """Serve service worker with version injected on-the-fly."""
    sw_path = _pwa_dir / "sw.js"
    if not sw_path.exists():
        raise HTTPException(status_code=404, detail="Service worker not found")
    content = sw_path.read_text()
    cache_version = f"{__version__}-{int(time.time())}"
    content = content.replace("__VERSION__", cache_version)
    return Response(content, media_type="application/javascript")

status() async

Return app status for mode detection by the PWA.

Source code in src/chess_self_coach/server.py
248
249
250
251
252
253
254
@app.get("/api/status")
async def status() -> StatusResponse:
    """Return app status for mode detection by the PWA."""
    return StatusResponse(
        version=__version__,
        stockfish_version=_sf_version,
    )

training_data() async

Serve training data directly from project root (always fresh).

Source code in src/chess_self_coach/server.py
636
637
638
639
640
641
642
@app.get("/training_data.json")
async def training_data():
    """Serve training data directly from project root (always fresh)."""
    path = _project_root / DATA_DIR / TRAINING_DATA_FILE
    if not path.exists():
        raise HTTPException(status_code=404, detail="No training data. Run: chess-self-coach train --prepare")
    return FileResponse(path, media_type="application/json")

update_analysis_settings(req) async

Save analysis engine settings to config.json.

Source code in src/chess_self_coach/server.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
@app.post("/api/analysis/settings")
async def update_analysis_settings(req: AnalysisSettingsResponse) -> AnalysisSettingsResponse:
    """Save analysis engine settings to config.json."""
    config_path = _project_root / DATA_DIR / CONFIG_FILE
    if not config_path.exists():
        raise HTTPException(status_code=404, detail="config.json not found")

    with open(config_path) as f:
        config = json.load(f)

    config["analysis_engine"] = {
        "threads": req.threads,
        "hash_mb": req.hash_mb,
        "limits": req.limits,
    }

    atomic_write_json(config_path, config)

    return req

update_config(req) async

Update editable config fields (players, analysis). Preserves other fields.

Source code in src/chess_self_coach/server.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
@app.post("/api/config")
async def update_config(req: ConfigUpdateRequest) -> ConfigResponse:
    """Update editable config fields (players, analysis). Preserves other fields."""
    config_path = _project_root / DATA_DIR / CONFIG_FILE
    if not config_path.exists():
        raise HTTPException(status_code=404, detail="config.json not found")

    with open(config_path) as f:
        config = json.load(f)

    if req.players is not None:
        config["players"] = req.players
    if req.analysis is not None:
        config["analysis"] = req.analysis

    atomic_write_json(config_path, config)

    return ConfigResponse(
        players=config.get("players", {}),
        analysis=config.get("analysis", {}),
    )

Game fetching from Lichess and chess.com.

fetch_chesscom_games(username, max_games=100)

Fetch rated rapid+ games from chess.com public API.

Uses the chessdotcom package (installed as chess.com) to access the chess.com public API.

Parameters:

Name Type Description Default
username str

Chess.com username.

required
max_games int

Maximum number of games to fetch.

100

Returns:

Type Description
list[Game]

List of parsed chess.pgn.Game objects.

Raises:

Type Description
SystemExit

If fetching fails.

Source code in src/chess_self_coach/importer.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def fetch_chesscom_games(username: str, max_games: int = 100) -> list[chess.pgn.Game]:
    """Fetch rated rapid+ games from chess.com public API.

    Uses the chessdotcom package (installed as chess.com) to access
    the chess.com public API.

    Args:
        username: Chess.com username.
        max_games: Maximum number of games to fetch.

    Returns:
        List of parsed chess.pgn.Game objects.

    Raises:
        SystemExit: If fetching fails.
    """
    try:
        from chessdotcom import get_player_game_archives
    except ImportError:
        error_exit(
            "chess.com package not installed.",
            hint="Install it with: uv add chess.com",
        )

    from chessdotcom import Client as ChesscomClient

    ChesscomClient.request_config["headers"]["User-Agent"] = (
        "chess-self-coach (github.com/Bobain/chess-self-coach)"
    )

    games = []
    try:
        archives = get_player_game_archives(username)
        archive_urls = archives.json.get("archives", [])

        # Process most recent archives first
        from chessdotcom import get_player_games_by_month

        for archive_url in reversed(archive_urls):
            if len(games) >= max_games:
                break

            # Extract year/month from archive URL: .../YYYY/MM
            parts = archive_url.rstrip("/").split("/")
            year, month = parts[-2], parts[-1]
            month_data = get_player_games_by_month(username, year, month)
            month_games = month_data.json.get("games", [])

            for game_data in reversed(month_games):
                if len(games) >= max_games:
                    break

                time_class = game_data.get("time_class", "")
                rated = game_data.get("rated", False)

                if not rated or time_class not in ("rapid", "classical", "daily"):
                    continue

                pgn_text = game_data.get("pgn", "")
                if pgn_text:
                    game = chess.pgn.read_game(io.StringIO(pgn_text))
                    if game:
                        games.append(game)

        print(f"  Fetched {len(games)} game(s) from chess.com for {username}")
    except Exception as e:
        error_exit(
            f"Failed to fetch chess.com games: {e}",
            hint=f"Check that username '{username}' exists on chess.com.",
        )

    return games

fetch_lichess_games(username, max_games=100)

Fetch rated rapid+ games from Lichess.

Parameters:

Name Type Description Default
username str

Lichess username.

required
max_games int

Maximum number of games to fetch.

100

Returns:

Type Description
list[Game]

List of parsed chess.pgn.Game objects.

Raises:

Type Description
SystemExit

If fetching fails.

Source code in src/chess_self_coach/importer.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def fetch_lichess_games(username: str, max_games: int = 100) -> list[chess.pgn.Game]:
    """Fetch rated rapid+ games from Lichess.

    Args:
        username: Lichess username.
        max_games: Maximum number of games to fetch.

    Returns:
        List of parsed chess.pgn.Game objects.

    Raises:
        SystemExit: If fetching fails.
    """
    token = load_lichess_token()
    session = berserk.TokenSession(token)
    client = berserk.Client(session=session)

    games = []
    try:
        exported = client.games.export_by_player(
            username,
            max=max_games,
            rated=True,
            perf_type="rapid,classical,correspondence",
            as_pgn=True,
            clocks=True,
        )

        pgn_text = "".join(exported) if hasattr(exported, "__iter__") else str(exported)
        pgn_io = io.StringIO(pgn_text)

        # Suppress chess.pgn parse errors (variant games produce illegal SAN warnings)
        chess_logger = logging.getLogger("chess.pgn")
        old_level = chess_logger.level
        chess_logger.setLevel(logging.CRITICAL)

        skipped_variants = 0
        while True:
            game = chess.pgn.read_game(pgn_io)
            if game is None:
                break
            variant = game.headers.get("Variant", "Standard")
            if variant != "Standard":
                site = game.headers.get("Site", "?")
                print(f"  ⚠ Skipping {variant} game: {site}", file=sys.stderr)
                skipped_variants += 1
                continue
            games.append(game)

        chess_logger.setLevel(old_level)

        msg = f"  Fetched {len(games)} game(s) from Lichess for {username}"
        if skipped_variants:
            msg += f" ({skipped_variants} variant game(s) excluded)"
        print(msg)
    except berserk.exceptions.ResponseError as e:
        error_exit(
            f"Failed to fetch Lichess games: {e}",
            hint=f"Check that username '{username}' exists on Lichess.",
        )

    return games

Game cache: fetch games from APIs and cache locally for later analysis.

Decouples game fetching (fast, API-only) from Stockfish analysis (slow). The cache stores raw PGN text so games can be deserialized on demand.

GameSummary dataclass

Summary of a game for the game list UI.

Attributes:

Name Type Description
game_id str

Unique game identifier (URL from PGN headers).

white str

White player name.

black str

Black player name.

player_color str

Color the player was playing ("white" or "black").

result str

Game result ("1-0", "0-1", "1/2-1/2").

date str

Game date string (YYYY.MM.DD).

opening str

Opening name if known.

move_count int

Number of half-moves in the game.

source str

Platform ("lichess" or "chess.com").

analyzed bool

Whether the game has been analyzed with Stockfish.

Source code in src/chess_self_coach/game_cache.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@dataclass
class GameSummary:
    """Summary of a game for the game list UI.

    Attributes:
        game_id: Unique game identifier (URL from PGN headers).
        white: White player name.
        black: Black player name.
        player_color: Color the player was playing ("white" or "black").
        result: Game result ("1-0", "0-1", "1/2-1/2").
        date: Game date string (YYYY.MM.DD).
        opening: Opening name if known.
        move_count: Number of half-moves in the game.
        source: Platform ("lichess" or "chess.com").
        analyzed: Whether the game has been analyzed with Stockfish.
    """

    game_id: str
    white: str
    black: str
    player_color: str
    result: str
    date: str
    opening: str
    move_count: int
    source: str
    analyzed: bool

    def to_dict(self) -> dict:
        """Serialize to dict for JSON API response."""
        return asdict(self)

to_dict()

Serialize to dict for JSON API response.

Source code in src/chess_self_coach/game_cache.py
51
52
53
def to_dict(self) -> dict:
    """Serialize to dict for JSON API response."""
    return asdict(self)

fetch_and_cache_games(lichess_user, chesscom_user, max_games=200)

Fetch games from Lichess and chess.com, cache locally.

Parameters:

Name Type Description Default
lichess_user str

Lichess username.

required
chesscom_user str | None

Optional chess.com username.

required
max_games int

Maximum games to fetch per source.

200

Returns:

Type Description
list[GameSummary]

List of GameSummary for all fetched games.

Source code in src/chess_self_coach/game_cache.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def fetch_and_cache_games(
    lichess_user: str,
    chesscom_user: str | None,
    max_games: int = 200,
) -> list[GameSummary]:
    """Fetch games from Lichess and chess.com, cache locally.

    Args:
        lichess_user: Lichess username.
        chesscom_user: Optional chess.com username.
        max_games: Maximum games to fetch per source.

    Returns:
        List of GameSummary for all fetched games.
    """
    from chess_self_coach.importer import fetch_chesscom_games, fetch_lichess_games

    # Fetch more than requested to account for duplicates already in cache
    existing_cache = load_game_cache()
    cached_count = len(existing_cache.get("games", {}))
    fetch_count = max_games + cached_count

    all_games: list[chess.pgn.Game] = []
    if lichess_user:
        all_games.extend(fetch_lichess_games(lichess_user, fetch_count))
    if chesscom_user:
        all_games.extend(fetch_chesscom_games(chesscom_user, fetch_count))

    cache_path = fetched_games_path()

    # Merge with existing cache (preserve previously fetched games)
    cache_games: dict[str, dict] = dict(existing_cache.get("games", {}))
    new_count = 0

    summaries: list[GameSummary] = []

    for game in all_games:
        game_id = _game_id_from_headers(game)
        if not game_id or game_id in cache_games:
            continue

        player_color = _determine_player_color(game, lichess_user, chesscom_user)
        if player_color is None:
            continue

        pgn_text = _game_to_pgn_text(game)
        summary = _game_to_summary(game, game_id, player_color)

        cache_games[game_id] = {
            "pgn": pgn_text,
            "headers": dict(game.headers),
            "player_color": player_color,
            "move_count": summary.move_count,
            "source": summary.source,
        }
        summaries.append(summary)
        new_count += 1

    # Also build summaries for existing cached games (so API returns all)
    for game_id, entry in existing_cache.get("games", {}).items():
        if any(s.game_id == game_id for s in summaries):
            continue
        summaries.append(GameSummary(
            game_id=game_id,
            white=entry.get("headers", {}).get("White", "?"),
            black=entry.get("headers", {}).get("Black", "?"),
            date=entry.get("headers", {}).get("Date", ""),
            result=entry.get("headers", {}).get("Result", "*"),
            player_color=entry.get("player_color", "white"),
            opening=entry.get("headers", {}).get("Opening", ""),
            move_count=entry.get("move_count", 0),
            source=entry.get("source", ""),
            analyzed=False,
        ))

    # Write merged cache
    cache_data = {
        "fetched_at": datetime.now(timezone.utc).isoformat(),
        "games": cache_games,
    }
    atomic_write_json(cache_path, cache_data)

    _log.info("Cached %d games (%d new) to %s", len(cache_games), new_count, cache_path)
    return summaries

get_cached_game(game_id)

Deserialize a single game from the cache.

Parameters:

Name Type Description Default
game_id str

Game URL identifier.

required

Returns:

Type Description
Game | None

Parsed chess.pgn.Game, or None if not in cache.

Source code in src/chess_self_coach/game_cache.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def get_cached_game(game_id: str) -> chess.pgn.Game | None:
    """Deserialize a single game from the cache.

    Args:
        game_id: Game URL identifier.

    Returns:
        Parsed chess.pgn.Game, or None if not in cache.
    """
    cache = load_game_cache()
    entry = cache.get("games", {}).get(game_id)
    if entry is None:
        return None

    pgn_io = io.StringIO(entry["pgn"])
    return chess.pgn.read_game(pgn_io)

get_unified_game_list(limit=20)

Merge fetched games cache with analysis data into a unified list.

Analysis data takes precedence (richer info, marked as analyzed). Sorted by date descending, capped at limit.

Parameters:

Name Type Description Default
limit int

Maximum number of games to return.

20

Returns:

Type Description
list[GameSummary]

List of GameSummary, most recent first.

Source code in src/chess_self_coach/game_cache.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def get_unified_game_list(limit: int = 20) -> list[GameSummary]:
    """Merge fetched games cache with analysis data into a unified list.

    Analysis data takes precedence (richer info, marked as analyzed).
    Sorted by date descending, capped at limit.

    Args:
        limit: Maximum number of games to return.

    Returns:
        List of GameSummary, most recent first.
    """
    from chess_self_coach.analysis import load_analysis_data

    # Load analysis data
    analysis_data = load_analysis_data(analysis_data_path())
    analyzed_games = analysis_data.get("games", {})
    player_info = analysis_data.get("player", {})
    lichess_user = player_info.get("lichess", "")
    chesscom_user = player_info.get("chesscom")

    # Load cache
    cache = load_game_cache()
    cached_games = cache.get("games", {})

    # If no player info from analysis, try config
    if not lichess_user and not chesscom_user:
        try:
            from chess_self_coach.config import load_config

            config = load_config()
            players = config.get("players", {})
            lichess_user = players.get("lichess", "")
            chesscom_user = players.get("chesscom")
        except Exception:
            pass

    # Build unified list: analyzed games first, then cached-only
    seen: set[str] = set()
    summaries: list[GameSummary] = []

    # Analyzed games
    for game_id, game_data in analyzed_games.items():
        seen.add(game_id)
        headers = game_data.get("headers", {})
        player_color = game_data.get("player_color", "white")
        moves = game_data.get("moves", [])

        opening = ""
        for m in moves:
            oe = m.get("opening_explorer")
            if oe and oe.get("moves"):
                for om in oe["moves"]:
                    if om and (om.get("opening") or {}).get("name") and om.get("uci") == m.get(
                        "move_uci"
                    ):
                        opening = om["opening"]["name"]
                        break
                if opening:
                    break

        summaries.append(
            GameSummary(
                game_id=game_id,
                white=headers.get("white", headers.get("White", "?")),
                black=headers.get("black", headers.get("Black", "?")),
                player_color=player_color,
                result=headers.get("result", headers.get("Result", "*")),
                date=headers.get("date", headers.get("Date", "")),
                opening=opening or headers.get("opening", headers.get("Opening", "")),
                move_count=len(moves),
                source=_detect_source(game_id),
                analyzed=True,
            )
        )

    # Cached-only games (not yet analyzed)
    for game_id, entry in cached_games.items():
        if game_id in seen:
            continue
        seen.add(game_id)
        headers = entry.get("headers", {})
        summaries.append(
            GameSummary(
                game_id=game_id,
                white=headers.get("White", "?"),
                black=headers.get("Black", "?"),
                player_color=entry.get("player_color", "white"),
                result=headers.get("Result", "*"),
                date=headers.get("Date", ""),
                opening=headers.get("Opening", headers.get("ECO", "")),
                move_count=entry.get("move_count", 0),
                source=entry.get("source", _detect_source(game_id)),
                analyzed=False,
            )
        )

    # Sort by date descending
    summaries.sort(key=lambda s: s.date, reverse=True)
    return summaries[:limit]

load_game_cache()

Load the fetched games cache.

Returns:

Type Description
dict

Cache dict with 'fetched_at' and 'games' keys, or empty structure.

Source code in src/chess_self_coach/game_cache.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def load_game_cache() -> dict:
    """Load the fetched games cache.

    Returns:
        Cache dict with 'fetched_at' and 'games' keys, or empty structure.
    """
    cache_path = fetched_games_path()
    if not cache_path.exists():
        return {"fetched_at": None, "games": {}}
    try:
        with open(cache_path) as f:
            return json.load(f)
    except (json.JSONDecodeError, OSError):
        _log.warning("Failed to load game cache from %s", cache_path)
        return {"fetched_at": None, "games": {}}

Lichess Masters Opening Explorer API client.

Queries the Lichess Masters opening explorer (FIDE 2200+ OTB games) to identify opening names, ECO codes, and move popularity statistics for each position. Used during Phase 1 analysis to detect when players depart from known theory.

A move is considered "in opening" (in_opening=True) only if it appears in the Masters database.

ExplorerAPIError

Bases: Exception

Raised when the Opening Explorer API is unavailable or rate-limited.

Source code in src/chess_self_coach/opening_explorer.py
37
38
class ExplorerAPIError(Exception):
    """Raised when the Opening Explorer API is unavailable or rate-limited."""

query_opening(fen, token)

Query the Lichess Masters Opening Explorer for a position.

Parameters:

Name Type Description Default
fen str

FEN string of the position to query.

required
token str

Lichess API personal access token.

required

Returns:

Type Description
dict | None

Dict with {opening, white, draws, black, moves[]} or None if position

dict | None

has zero games in Masters database.

Raises:

Type Description
ExplorerAPIError

If both primary/fallback endpoints fail.

Source code in src/chess_self_coach/opening_explorer.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def query_opening(fen: str, token: str) -> dict | None:
    """Query the Lichess Masters Opening Explorer for a position.

    Args:
        fen: FEN string of the position to query.
        token: Lichess API personal access token.

    Returns:
        Dict with {opening, white, draws, black, moves[]} or None if position
        has zero games in Masters database.

    Raises:
        ExplorerAPIError: If both primary/fallback endpoints fail.
    """
    return _query_endpoint(fen, token, _MASTERS_PRIMARY, _MASTERS_FALLBACK, {"fen": fen})

query_opening_sequence(fens_and_moves, token, *, existing_results=None)

Query the Masters Opening Explorer for a sequence of positions.

Queries Masters until the played move is not found (theory departure). Each returned dict has _source="masters".

When existing_results is provided (re-analysis), positions that already have Masters data are preserved without API calls. Querying resumes from the first position without Masters data (breakpoint re-testing).

Parameters:

Name Type Description Default
fens_and_moves list[tuple[str, str]]

List of (fen_before, move_uci) tuples for each ply.

required
token str

Lichess API personal access token.

required
existing_results list[dict | None] | None

Previous opening_explorer data per ply (from a prior analysis). Entries with _source="masters" are kept as-is.

None

Returns:

Type Description
list[dict | None]

List of explorer responses (same length as input). None entries mean

list[dict | None]

the position was past theory departure.

Source code in src/chess_self_coach/opening_explorer.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def query_opening_sequence(
    fens_and_moves: list[tuple[str, str]],
    token: str,
    *,
    existing_results: list[dict | None] | None = None,
) -> list[dict | None]:
    """Query the Masters Opening Explorer for a sequence of positions.

    Queries Masters until the played move is not found (theory departure).
    Each returned dict has ``_source="masters"``.

    When *existing_results* is provided (re-analysis), positions that already
    have Masters data are preserved without API calls.  Querying resumes from
    the first position without Masters data (breakpoint re-testing).

    Args:
        fens_and_moves: List of (fen_before, move_uci) tuples for each ply.
        token: Lichess API personal access token.
        existing_results: Previous opening_explorer data per ply (from a prior
            analysis).  Entries with ``_source="masters"`` are kept as-is.

    Returns:
        List of explorer responses (same length as input). None entries mean
        the position was past theory departure.
    """
    results: list[dict | None] = []
    masters_departed = False

    for i, (fen, move_uci) in enumerate(fens_and_moves):
        # Re-analysis: preserve existing masters data
        if (
            not masters_departed
            and existing_results is not None
            and i < len(existing_results)
        ):
            existing = existing_results[i]
            if existing is not None and existing.get("_source") == "masters":
                results.append(existing)
                continue
            # No existing masters data → fall through to query

        if masters_departed:
            results.append(None)
            continue

        md = query_opening(fen, token)
        if md is not None:
            known = {m["uci"] for m in md.get("moves", [])}
            if move_uci in known:
                md["_source"] = "masters"
                results.append(md)
            else:
                masters_departed = True
                results.append(None)
        else:
            masters_departed = True
            results.append(None)

        time.sleep(_RATE_LIMIT_DELAY)

    return results

refresh_opening_data(path=None, token=None)

Re-query Masters for existing games to update in_opening flags.

For each game, iterates through moves that currently have in_opening=True. Queries Masters for each position until Masters departs, then sets in_opening=False for that move and all subsequent moves.

Does NOT re-run Stockfish or touch eval data.

Parameters:

Name Type Description Default
path Path | None

Path to analysis_data.json. Defaults to config.

None
token str | None

Lichess API token. Required for API calls.

None

Returns:

Type Description
dict[str, Any]

Dict with stats: {games_updated, moves_changed, total_masters}.

Source code in src/chess_self_coach/opening_explorer.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def refresh_opening_data(
    path: Path | None = None,
    token: str | None = None,
) -> dict[str, Any]:
    """Re-query Masters for existing games to update in_opening flags.

    For each game, iterates through moves that currently have in_opening=True.
    Queries Masters for each position until Masters departs, then sets
    in_opening=False for that move and all subsequent moves.

    Does NOT re-run Stockfish or touch eval data.

    Args:
        path: Path to analysis_data.json. Defaults to config.
        token: Lichess API token. Required for API calls.

    Returns:
        Dict with stats: {games_updated, moves_changed, total_masters}.
    """
    if path is None:
        path = analysis_data_path()

    with open(path) as f:
        data = json.load(f)

    games = data.get("games", {})
    stats = {"games_updated": 0, "moves_changed": 0, "total_masters": 0}

    try:
        for i, (game_id, game_data) in enumerate(games.items()):
            moves = game_data.get("moves", [])
            masters_departed = False
            game_changed = False

            for move in moves:
                old_in_opening = move.get("in_opening", False)

                if masters_departed:
                    # After Masters departure: force in_opening=False
                    if old_in_opening:
                        move["in_opening"] = False
                        game_changed = True
                        stats["moves_changed"] += 1
                    continue

                if not old_in_opening:
                    # Already out of opening — stop checking this game
                    break

                # Was in_opening=True: check Masters
                fen = move.get("fen_before")
                move_uci = move.get("move_uci")
                if not fen or not move_uci or not token:
                    masters_departed = True
                    move["in_opening"] = False
                    game_changed = True
                    stats["moves_changed"] += 1
                    continue

                # ExplorerAPIError propagates — never silently treat API
                # failure as "not in theory"
                md = query_opening(fen, token)
                if md is not None:
                    known = {m["uci"] for m in md.get("moves", [])}
                    if move_uci in known:
                        # Masters confirms this move — update explorer data
                        md["_source"] = "masters"
                        move["opening_explorer"] = md
                        stats["total_masters"] += 1
                    else:
                        # Move not in Masters — departure
                        masters_departed = True
                        move["in_opening"] = False
                        game_changed = True
                        stats["moves_changed"] += 1
                else:
                    # Position not in Masters (0 games) — departure
                    masters_departed = True
                    move["in_opening"] = False
                    game_changed = True
                    stats["moves_changed"] += 1

                time.sleep(_RATE_LIMIT_DELAY)

            if game_changed:
                stats["games_updated"] += 1

            if (i + 1) % 50 == 0 or i + 1 == len(games):
                print(f"  Refreshed {i + 1}/{len(games)} games...")

    except ExplorerAPIError as exc:
        print(f"\n  ERROR: API failure — {exc}")
        print(f"  Saving partial progress ({stats['games_updated']} games updated so far)...")
        atomic_write_json(path, data)
        raise

    atomic_write_json(path, data)
    print(f"  Done: {stats['games_updated']} games updated, "
          f"{stats['moves_changed']} moves changed, "
          f"{stats['total_masters']} moves confirmed in Masters")
    return stats

Lichess tablebase API client for perfect endgame analysis.

Probes the public Lichess tablebase API (no token required) for positions with <= 7 pieces. Returns mathematically exact Win/Draw/Loss verdicts instead of heuristic Stockfish evaluations.

Transient errors (429, 5xx, timeouts) are retried with exponential backoff. Only a genuine 404 (position not in database) returns None.

API: https://tablebase.lichess.ovh/standard?fen= Coverage: up to 7 pieces (Syzygy tablebases)

RateLimitExhaustedError

Bases: Exception

Raised when the API rate limit persists after maximum backoff (128 min).

Source code in src/chess_self_coach/tablebase.py
46
47
class RateLimitExhaustedError(Exception):
    """Raised when the API rate limit persists after maximum backoff (128 min)."""

TablebaseResult dataclass

Result from a tablebase probe.

Source code in src/chess_self_coach/tablebase.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@dataclass
class TablebaseResult:
    """Result from a tablebase probe."""

    category: str
    dtz: int | None
    dtm: int | None
    best_move: str | None

    @property
    def tier(self) -> str:
        """WDL tier: WIN, DRAW, or LOSS."""
        return _CATEGORY_TIERS.get(self.category, "DRAW")

    def format_verdict(self) -> str:
        """Human-readable verdict, e.g. 'win, mate in 23' or 'draw'."""
        tier = self.tier.lower()
        if self.dtm is not None and self.dtm != 0:
            return f"{tier}, mate in {abs(self.dtm)}"
        if self.dtz is not None and self.dtz != 0:
            return f"{tier} (DTZ {abs(self.dtz)})"
        return tier

tier property

WDL tier: WIN, DRAW, or LOSS.

format_verdict()

Human-readable verdict, e.g. 'win, mate in 23' or 'draw'.

Source code in src/chess_self_coach/tablebase.py
79
80
81
82
83
84
85
86
def format_verdict(self) -> str:
    """Human-readable verdict, e.g. 'win, mate in 23' or 'draw'."""
    tier = self.tier.lower()
    if self.dtm is not None and self.dtm != 0:
        return f"{tier}, mate in {abs(self.dtm)}"
    if self.dtz is not None and self.dtz != 0:
        return f"{tier} (DTZ {abs(self.dtz)})"
    return tier

probe_position(fen)

Probe the Lichess tablebase API for a position.

Retries indefinitely on transient errors (429, 5xx, network) with exponential backoff. Only returns None for too many pieces or 404.

Parameters:

Name Type Description Default
fen str

FEN string of the position.

required

Returns:

Type Description
TablebaseResult | None

TablebaseResult if the position has <= 7 pieces and is in the

TablebaseResult | None

tablebase, None otherwise.

Source code in src/chess_self_coach/tablebase.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def probe_position(fen: str) -> TablebaseResult | None:
    """Probe the Lichess tablebase API for a position.

    Retries indefinitely on transient errors (429, 5xx, network) with
    exponential backoff. Only returns None for too many pieces or 404.

    Args:
        fen: FEN string of the position.

    Returns:
        TablebaseResult if the position has <= 7 pieces and is in the
        tablebase, None otherwise.
    """
    board = chess.Board(fen)
    if len(board.piece_map()) > MAX_PIECES:
        return None

    data = _fetch_tablebase(fen)
    if data is None:
        return None

    category = data.get("category")
    if not category or category not in _CATEGORY_TIERS:
        _log.warning("    tablebase %s → unknown category %r", fen[:40], category)
        return None

    # Best move from the moves list
    best_move = None
    moves = data.get("moves", [])
    if moves:
        best_move = moves[0].get("san")

    return TablebaseResult(
        category=category,
        dtz=data.get("dtz"),
        dtm=data.get("dtm"),
        best_move=best_move,
    )

probe_position_full(fen, on_wait=None)

Probe the Lichess tablebase API and return the complete response.

Unlike probe_position() which returns a simplified TablebaseResult, this returns the raw API response including all legal moves with their WDL/DTM/DTZ data — suitable for storing in analysis_data.json.

Retries indefinitely on transient errors (429, 5xx, network) with exponential backoff. Only returns None for too many pieces or 404.

Parameters:

Name Type Description Default
fen str

FEN string of the position.

required
on_wait Callable[[int, float], None] | None

Optional callback(attempt, delay_seconds) called before each retry sleep, so callers can surface the wait to the UI.

None

Returns:

Type Description
dict[str, Any] | None

Full API response dict (category, dtm, dtz, precise_dtz, dtw, dtc,

dict[str, Any] | None

checkmate, stalemate, moves[]) or None if unavailable.

Source code in src/chess_self_coach/tablebase.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def probe_position_full(
    fen: str,
    on_wait: Callable[[int, float], None] | None = None,
) -> dict[str, Any] | None:
    """Probe the Lichess tablebase API and return the complete response.

    Unlike probe_position() which returns a simplified TablebaseResult,
    this returns the raw API response including all legal moves with their
    WDL/DTM/DTZ data — suitable for storing in analysis_data.json.

    Retries indefinitely on transient errors (429, 5xx, network) with
    exponential backoff. Only returns None for too many pieces or 404.

    Args:
        fen: FEN string of the position.
        on_wait: Optional callback(attempt, delay_seconds) called before
            each retry sleep, so callers can surface the wait to the UI.

    Returns:
        Full API response dict (category, dtm, dtz, precise_dtz, dtw, dtc,
        checkmate, stalemate, moves[]) or None if unavailable.
    """
    board = chess.Board(fen)
    if len(board.piece_map()) > MAX_PIECES:
        return None

    data = _fetch_tablebase(fen, on_wait=on_wait)
    if data is None:
        return None

    category = data.get("category")
    if not category or category not in _CATEGORY_TIERS:
        _log.warning("    tablebase %s → unknown category %r", fen[:40], category)
        return None

    # Add computed tier for convenience
    data["tier"] = _CATEGORY_TIERS[category]

    return data

tablebase_context(before, piece_count, player_color='white')

Generate context string for a tablebase-resolved position.

Parameters:

Name Type Description Default
before TablebaseResult

Tablebase result for the position before the move.

required
piece_count int

Number of pieces on the board.

required
player_color str

"white" or "black".

'white'

Returns:

Type Description
str

Context string shown before the player answers.

Source code in src/chess_self_coach/tablebase.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def tablebase_context(
    before: TablebaseResult, piece_count: int, player_color: str = "white"
) -> str:
    """Generate context string for a tablebase-resolved position.

    Args:
        before: Tablebase result for the position before the move.
        piece_count: Number of pieces on the board.
        player_color: "white" or "black".

    Returns:
        Context string shown before the player answers.
    """
    verdict = before.format_verdict()
    tier = before.tier
    color_label = f"playing as {player_color.capitalize()}"
    if tier == "WIN":
        advantage = "you had a winning position"
    elif tier == "LOSS":
        advantage = "you were in a difficult position"
    else:
        advantage = "the position was equal"
    return f"Endgame ({piece_count} pieces), {color_label}, {advantage}. Tablebase: theoretical {verdict}."

tablebase_explanation(before, after, actual_san, best_san)

Generate explanation for a tablebase-detected mistake.

Parameters:

Name Type Description Default
before TablebaseResult

Tablebase result before the move.

required
after TablebaseResult

Tablebase result after the move.

required
actual_san str

The move the player made.

required
best_san str | None

The best move according to the tablebase.

required

Returns:

Type Description
str

Explanation string.

Source code in src/chess_self_coach/tablebase.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def tablebase_explanation(
    before: TablebaseResult,
    after: TablebaseResult,
    actual_san: str,
    best_san: str | None,
) -> str:
    """Generate explanation for a tablebase-detected mistake.

    Args:
        before: Tablebase result before the move.
        after: Tablebase result after the move.
        actual_san: The move the player made.
        best_san: The best move according to the tablebase.

    Returns:
        Explanation string.
    """
    verdict_before = before.format_verdict()
    verdict_after = after.format_verdict()

    parts = [f"Tablebase: the position was a theoretical {verdict_before}."]
    parts.append(f"Your move {actual_san} turns it into a {verdict_after}.")

    if best_san:
        parts.append(f"The correct move was {best_san}.")

    return " ".join(parts)

Lichess Cloud Evaluation API client.

Queries the Lichess cloud database for pre-computed Stockfish analysis. Opening positions have near-perfect coverage at depth 50-70, making this much faster than running Stockfish locally.

Transient errors (429, 5xx, timeouts) are retried with exponential backoff. Only a genuine 404 (position not in database) returns None.

API: https://lichess.org/api#tag/Analysis

RateLimitExhaustedError

Bases: Exception

Raised when the API rate limit persists after maximum backoff (128 min).

Source code in src/chess_self_coach/cloud_eval.py
40
41
class RateLimitExhaustedError(Exception):
    """Raised when the API rate limit persists after maximum backoff (128 min)."""

query_cloud_eval(fen, multi_pv=1, on_wait=None, log_label='')

Query the Lichess Cloud database for a position.

Retries on transient errors (429, 5xx, network) with exponential backoff up to 128 minutes. Raises RateLimitExhaustedError if still failing after a retry at max backoff. Only returns None for a genuine cache miss (404).

Parameters:

Name Type Description Default
fen str

FEN string of the position to query.

required
multi_pv int

Number of principal variations to request.

1
on_wait Callable[[int, float], None] | None

Optional callback(attempt, delay_seconds) called before each retry sleep, so callers can surface the wait to the UI.

None
log_label str

Optional prefix for log messages (e.g. "[ply 12 after] ") to identify which step triggered the query.

''

Returns:

Type Description
dict[str, Any] | None

API response dict with {fen, knodes, depth, pvs[]} or None if

dict[str, Any] | None

the position is not in the database (404).

Raises:

Type Description
RateLimitExhaustedError

If rate limit persists after max backoff.

Source code in src/chess_self_coach/cloud_eval.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def query_cloud_eval(
    fen: str,
    multi_pv: int = 1,
    on_wait: Callable[[int, float], None] | None = None,
    log_label: str = "",
) -> dict[str, Any] | None:
    """Query the Lichess Cloud database for a position.

    Retries on transient errors (429, 5xx, network) with exponential backoff
    up to 128 minutes. Raises RateLimitExhaustedError if still failing after
    a retry at max backoff. Only returns None for a genuine cache miss (404).

    Args:
        fen: FEN string of the position to query.
        multi_pv: Number of principal variations to request.
        on_wait: Optional callback(attempt, delay_seconds) called before
            each retry sleep, so callers can surface the wait to the UI.
        log_label: Optional prefix for log messages (e.g. "[ply 12 after] ")
            to identify which step triggered the query.

    Returns:
        API response dict with {fen, knodes, depth, pvs[]} or None if
        the position is not in the database (404).

    Raises:
        RateLimitExhaustedError: If rate limit persists after max backoff.
    """
    global _last_request_time
    params = {"fen": fen, "multiPv": multi_pv}
    attempt = 0
    at_max_count = 0

    while True:
        # Rate limit: wait between consecutive requests
        since_last = time.time() - _last_request_time
        if since_last < _RATE_LIMIT_DELAY:
            time.sleep(_RATE_LIMIT_DELAY - since_last)

        t0 = time.time()
        try:
            _last_request_time = time.time()
            resp = requests.get(_URL, params=params, timeout=_TIMEOUT)

            if resp.status_code == 200:
                result = resp.json()
                _log.info(
                    "    cloud %s%s → hit (%.0fms, depth=%s)",
                    log_label, fen[:40], (time.time() - t0) * 1000,
                    result.get("depth"),
                )
                return result

            if resp.status_code == 404:
                _log.info(
                    "    cloud %s%s → miss/404 (%.0fms)",
                    log_label, fen[:40], (time.time() - t0) * 1000,
                )
                return None

            # Transient error (429, 5xx, etc.) — retry with backoff
            delay = min(_BACKOFF_BASE * (2 ** attempt), _BACKOFF_MAX)
            if delay >= _BACKOFF_MAX:
                at_max_count += 1
                if at_max_count > 1:
                    msg = f"Cloud eval rate limit exhausted after {attempt + 1} retries (fen={fen[:40]})"
                    _log.error("    %s", msg)
                    raise RateLimitExhaustedError(msg)
            retry_after = resp.headers.get("Retry-After", "?")
            _log.warning(
                "    cloud %s%s → HTTP %d (Retry-After: %s), retrying in %.0fs (attempt %d)",
                log_label, fen[:40], resp.status_code, retry_after, delay,
                attempt + 1,
            )
            if on_wait:
                on_wait(attempt + 1, delay)
            time.sleep(delay)
            attempt += 1

        except (requests.RequestException, ValueError) as exc:
            delay = min(_BACKOFF_BASE * (2 ** attempt), _BACKOFF_MAX)
            if delay >= _BACKOFF_MAX:
                at_max_count += 1
                if at_max_count > 1:
                    msg = f"Cloud eval rate limit exhausted after {attempt + 1} retries (fen={fen[:40]})"
                    _log.error("    %s", msg)
                    raise RateLimitExhaustedError(msg) from exc
            _log.warning(
                "    cloud %s%s%s, retrying in %.0fs (attempt %d)",
                log_label, fen[:40], exc, delay, attempt + 1,
            )
            if on_wait:
                on_wait(attempt + 1, delay)
            time.sleep(delay)
            attempt += 1

Syzygy endgame tablebase management.

Download, locate, and validate local Syzygy tablebases (3-5 pieces) for use by Stockfish via the SyzygyPath UCI option.

download_syzygy(target_dir=None)

Download 3-5 piece Syzygy tablebases (~1 GB).

Parameters:

Name Type Description Default
target_dir Path | None

Where to store tables. Defaults to ~/.local/share/syzygy/.

None

Returns:

Type Description
Path

Path to the download directory.

Raises:

Type Description
FileNotFoundError

If wget is not installed.

CalledProcessError

If download fails.

Source code in src/chess_self_coach/syzygy.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def download_syzygy(target_dir: Path | None = None) -> Path:
    """Download 3-5 piece Syzygy tablebases (~1 GB).

    Args:
        target_dir: Where to store tables. Defaults to ~/.local/share/syzygy/.

    Returns:
        Path to the download directory.

    Raises:
        FileNotFoundError: If wget is not installed.
        subprocess.CalledProcessError: If download fails.
    """
    if target_dir is None:
        target_dir = _DEFAULT_DIR

    if not shutil.which("wget"):
        raise FileNotFoundError(
            "wget is required to download Syzygy tables.\n"
            "  Install with: sudo apt install wget  (Linux) or  brew install wget  (macOS)"
        )

    target_dir.mkdir(parents=True, exist_ok=True)

    subprocess.run(
        [
            "wget", "-c", "-r", "-np", "-nH", "--cut-dirs=2",
            "-e", "robots=off", "-A", "*.rtbw,*.rtbz",
            "-P", str(target_dir),
            _MIRROR,
        ],
        check=True,
    )

    return target_dir

find_syzygy(config=None)

Find a directory containing Syzygy tablebase files.

Parameters:

Name Type Description Default
config dict | None

Optional config dict. Reads config["syzygy"]["path"] first.

None

Returns:

Type Description
Path | None

Path to a directory with .rtbw files, or None if not found.

Source code in src/chess_self_coach/syzygy.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def find_syzygy(config: dict | None = None) -> Path | None:
    """Find a directory containing Syzygy tablebase files.

    Args:
        config: Optional config dict. Reads config["syzygy"]["path"] first.

    Returns:
        Path to a directory with .rtbw files, or None if not found.
    """
    candidates: list[Path] = []

    if config:
        custom = config.get("syzygy", {}).get("path")
        if custom:
            candidates.append(Path(custom).expanduser())

    candidates.extend(_SEARCH_PATHS)

    for path in candidates:
        if _is_valid_syzygy_dir(path):
            return path
    return None

syzygy_status(config=None)

Report status of local Syzygy tablebases.

Parameters:

Name Type Description Default
config dict | None

Optional config dict for custom path lookup.

None

Returns:

Type Description
dict

Dict with path, found (bool), wdl_count, dtz_count, total_size_mb.

Source code in src/chess_self_coach/syzygy.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def syzygy_status(config: dict | None = None) -> dict:
    """Report status of local Syzygy tablebases.

    Args:
        config: Optional config dict for custom path lookup.

    Returns:
        Dict with path, found (bool), wdl_count, dtz_count, total_size_mb.
    """
    path = find_syzygy(config)
    if path is None:
        return {"path": None, "found": False, "wdl_count": 0, "dtz_count": 0, "total_size_mb": 0}

    wdl_files = list(path.glob("*.rtbw"))
    dtz_files = list(path.glob("*.rtbz"))
    total_bytes = sum(f.stat().st_size for f in wdl_files + dtz_files)

    return {
        "path": str(path),
        "found": True,
        "wdl_count": len(wdl_files),
        "dtz_count": len(dtz_files),
        "total_size_mb": round(total_bytes / (1024 * 1024), 1),
    }

Self-update mechanism for chess-self-coach.

check_stockfish_update()

Check GitHub for a newer Stockfish release.

Compares the locally installed Stockfish version against the latest GitHub release of official-stockfish/Stockfish.

Returns:

Type Description
bool

Tuple of (update_available, installed_version, latest_version).

str | None

On any error, returns (False, None, None) — never crashes.

Source code in src/chess_self_coach/updater.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def check_stockfish_update() -> tuple[bool, str | None, str | None]:
    """Check GitHub for a newer Stockfish release.

    Compares the locally installed Stockfish version against the latest
    GitHub release of official-stockfish/Stockfish.

    Returns:
        Tuple of (update_available, installed_version, latest_version).
        On any error, returns (False, None, None) — never crashes.
    """
    from chess_self_coach.config import find_stockfish, check_stockfish_version

    try:
        sf_path = find_stockfish()
    except SystemExit:
        return False, None, None

    installed = check_stockfish_version(sf_path)
    # installed is like "Stockfish 18" or "Stockfish 17"
    installed_num = installed.replace("Stockfish", "").strip()

    try:
        resp = urllib.request.urlopen(
            "https://api.github.com/repos/official-stockfish/Stockfish/releases/latest",
            timeout=3,
        )
        data = json.loads(resp.read())
        tag = data.get("tag_name", "")
        # Tags are like "sf_17", "sf_18", or "stockfish-18"
        latest_num = tag.replace("sf_", "").replace("stockfish-", "").strip()
        if not latest_num or not installed_num:
            return False, installed, None
        return latest_num > installed_num, installed, f"Stockfish {latest_num}"
    except Exception:
        return False, installed, None

check_update()

Check PyPI for a newer version.

Returns:

Type Description
bool

Tuple of (update_available, latest_version). On network error,

str | None

returns (False, None) — never crashes.

Source code in src/chess_self_coach/updater.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def check_update() -> tuple[bool, str | None]:
    """Check PyPI for a newer version.

    Returns:
        Tuple of (update_available, latest_version). On network error,
        returns (False, None) — never crashes.
    """
    from chess_self_coach import __version__

    try:
        resp = urllib.request.urlopen(
            "https://pypi.org/pypi/chess-self-coach/json", timeout=3,
        )
        data = json.loads(resp.read())
        latest = data["info"]["version"]
        # Compare as tuples to detect only newer versions
        def _parse_ver(v: str) -> tuple[int, ...]:
            return tuple(int(x) for x in v.split("."))
        return (_parse_ver(latest) > _parse_ver(__version__)), latest
    except Exception:
        return False, None

update()

Update chess-self-coach to the latest version via uv, pipx, or pip.

Source code in src/chess_self_coach/updater.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def update() -> None:
    """Update chess-self-coach to the latest version via uv, pipx, or pip."""
    tools = [
        ("uv", ["uv", "tool", "upgrade", "chess-self-coach"]),
        ("pipx", ["pipx", "upgrade", "chess-self-coach"]),
        ("pip", [sys.executable, "-m", "pip", "install", "--upgrade", "chess-self-coach"]),
    ]
    for name, cmd in tools:
        if not shutil.which(cmd[0]):
            continue
        print(f"Updating via {name}...")
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode == 0:
            installed = _get_installed_version()
            print(f"\n✓ Updated to v{installed}!")
            return
        # Tool found but failed — try next one
        print(f"{name} failed, trying next method...")

    print("Update failed: no working package manager found.", file=sys.stderr)
    sys.exit(1)

Shared constants and types for chess analysis engine.

Single source of truth for values used across multiple modules (analysis.py, trainer.py, tablebase.py).

EvalDict

Bases: TypedDict

Standard evaluation dictionary returned by all eval extraction functions.

Used by _extract_eval (Stockfish), _tb_to_eval (tablebase), and _cloud_eval_to_eval (Lichess Cloud Eval).

Source code in src/chess_self_coach/constants.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class EvalDict(TypedDict):
    """Standard evaluation dictionary returned by all eval extraction functions.

    Used by _extract_eval (Stockfish), _tb_to_eval (tablebase),
    and _cloud_eval_to_eval (Lichess Cloud Eval).
    """

    score_cp: int | None
    is_mate: bool
    mate_in: int | None
    depth: int | None
    seldepth: int | None
    nodes: int | None
    nps: int | None
    time_ms: int | None
    tbhits: int | None
    hashfull: int | None
    pv_san: list[str]
    pv_uci: list[str]
    best_move_san: str | None
    best_move_uci: str | None