# One-way sync

{% tabs %}
{% tab title="End users" %}
One-way sync means: **read from source, write to target**.

It is the safest mode.

#### When to use it

Use one-way when you want a clear source of truth.

Typical pattern:

* Media server → tracker
* One feature at a time

#### Recommended workflow

1. Enable **Dry run** first.
2. Enable **Add**.
3. Keep **Remove** off until you trust matching.
4. Expand to more features and pairs slowly.

#### Common “why did nothing happen?”

* The pair feature is disabled.
* Adds/removes are disabled.
* A provider is down or auth failed.
* Snapshots were coerced by drop guard (safe behavior).

Related:

* Safer defaults: [Best practice](/getting-started/best-practices.md)
* Safety model: [Guardrails](/blueprint-architecture/orchestrator/guardrails.md)
* Where state is stored: [State](/blueprint-architecture/orchestrator/state.md)
  {% endtab %}

{% tab title="Power users" %}
One-way sync means: **read from `src`, write to `dst`**.

It is the safest mode. It is the recommended first setup.

<details>

<summary>Implementation notes</summary>

Primary implementation: `cw_platform/orchestrator/_pairs_oneway.py`

Key helpers: `_snapshots.py`, `_planner.py`, `_applier.py`, `_pairs_blocklist.py`, `_pairs_massdelete.py`, `_unresolved.py`, `_blackbox.py`, `_phantoms.py`

</details>

Related runtime docs:

* [Orchestrator](/blueprint-architecture/orchestrator.md)
* [Snapshots (indices)](/blueprint-architecture/orchestrator/snapshots.md)
* [Blackbox](/blueprint-architecture/orchestrator/blackbox.md)

***

### Overview

Per feature, one-way does:

1. Gate by provider availability and health.
2. Build snapshots for `src` and `dst`.
3. Load previous baselines and manual policy.
4. Apply drop-guard and per-feature filters.
5. Plan adds/removes (or rating upserts/unrates).
6. Apply guardrails (mass delete, blocklists, phantoms).
7. Write changes in chunks with retries.
8. Persist updated baselines, checkpoints, and run metadata.

***

### Technical reference

#### Entry point

`run_one_way_feature(ctx, src, dst, feature=..., fcfg=..., health_map=...)`

Emits (high level):

* `feature:start`
* plan/apply events
* `feature:done`

Returns a summary object like:

```json
{
  "ok": true,
  "added": 12,
  "removed": 3,
  "skipped": 0,
  "unresolved": 1,
  "errors": 0,
  "res_add": {},
  "res_remove": {}
}
```

***

#### Step 0 — Provider + health gating

Hard gates and early exits:

* Missing provider ops:
  * If `ctx.providers[src]` or `ctx.providers[dst]` is missing → `ok:false`.
* Auth failed:
  * If health status is `auth_failed` for either side → skip the pair early.
* Feature unsupported:
  * If capability says unsupported (`capabilities.features[feature] == false`) → emit `feature:unsupported`.
  * If health says feature not ok (`health.features[feature] == false`) → emit `feature:unsupported`.
  * Returns `ok:true` with no ops.
* Source down:
  * If `src` is `down` → emit `writes:skipped reason=source_down` and return.
* Destination down:
  * Planning still happens.
  * Writes are skipped.
  * Items are typically recorded as unresolved.

Flags:

* `allow_adds`: from `fcfg.add` or `sync.enable_add` (default `true`)
* `allow_removes`: from `fcfg.remove` or `sync.enable_remove` (default `false`)

***

#### Step 1 — Build snapshots (src + dst)

Only the two providers are passed to snapshot builder:

```py
snaps = build_snapshots_for_feature(feature, config, providers={src:src_ops, dst:dst_ops}, ...)
src_cur = snaps[src] or {}
dst_cur = snaps[dst] or {}
```

Snapshot caching:

* uses `ctx.snap_cache` + `ctx.snap_ttl_sec`
* `_bust_snapshot(dst)` is called after successful writes to prevent using stale cached snapshots later in the same run

***

#### Step 2 — Load previous baselines + manual policy

Loads `state.json` (merged with `state.manual.json` via `StateStore.load_state()`):

* `prev_src` baseline: `state.providers[src][feature].baseline.items`
* `prev_dst` baseline: `state.providers[dst][feature].baseline.items`

Manual policy is read from **the source provider’s** state node:

```py
manual_adds, manual_blocks = manual_policy(prev_state, src, feature)
```

* manual adds: a dict of `canonical_key -> item`
* manual blocks: a set of tokens (keys / ids / title-year tokens)

Manual adds are merged into `src_idx` later. Manual blocks are applied after blocklist filtering.

***

#### Step 3 — Drop guard (optional)

Controlled by:

* `sync.drop_guard` (bool)
* runtime knobs:
  * `runtime.suspect_min_prev` (default 20)
  * `runtime.suspect_shrink_ratio` (default 0.10)
  * `runtime.suspect_debug` (default True)

If enabled:

* computes `prev_cp` from state and `now_cp` via `module_checkpoint(ops, cfg, feature)`
* calls `coerce_suspect_snapshot(...)` for both src and dst

Result:

* `eff_src`, `eff_dst` are either the current snapshot or coerced back to baseline
* emits `snapshot:suspect` (from `_snapshots.py`) + local `dbg("snapshot.guard", ...)`

If disabled:

* `eff_src = src_cur`, `eff_dst = dst_cur`

***

#### Step 4 — Library whitelists (history/ratings only)

Library filtering applies when `feature in ("history", "ratings")`.

Whitelist sources (in order):

1. feature config: `fcfg.libraries`
   * can be list, or per-provider dict (`{ "PLEX": ["1","2"] }`)
2. provider config:
   * `cfg["plex"][feature]["libraries"]` (or jellyfin/emby/anilist mapping where defined)

Filtering behavior:

* Looks at `library_id|libraryId|library|section_id|sectionId` on each item.
* Items with unknown library id are normally dropped.
* Exception: Plex history sets `allow_unknown=True`.

***

#### Step 5 — Index semantics: present vs delta

For each side:

```py
sem = ops.capabilities().get("index_semantics","present")
```

Then:

* if `sem == "delta"`: `full = prev | cur` (baseline merged with current delta)
* else: `full = eff` (present snapshot, possibly coerced by drop-guard)

In code:

* destination: `dst_full`
* source: `src_idx`

***

#### Step 6 — Feature-specific filters (ratings only)

For `feature == "ratings"`:

* `types`: `fcfg.types` (normalized to `movie|show|episode`)
* `from_date`: `fcfg.from_date` (YYYY-MM-DD)

A rating entry is kept if:

* type matches (when `types` provided)
* `rated_at` is missing, or `rated_at[:10] >= from_date`

***

#### Step 7 — Planning (diff)

Manual adds (if any) are merged into the source index:

```py
src_idx = merge_manual_adds(src_idx, manual_adds)
```

Planner:

* presence features: `adds, removes = diff(src_idx, dst_full)`
* ratings: `adds, removes = diff_ratings(src_idx, dst_full)`
  * “adds” are **upserts** (new rating or rating change)
  * “removes” are **unrates** (rating exists on dst but not on src)

***

#### Step 8 — Anti-duplication using typed alias tokens

Before applying:

* builds alias maps (`_alias_index`) using `_typed_tokens(item)`

Typed token rules:

* **movie/show**: `idkey:idval`
* **season**: `idkey:idval#season:N`
* **episode**: `idkey:idval#sSSeEE` (zero-padded)

Then `_present(idx, alias, item)` returns true if:

* canonical key is already in the index, OR
* any typed token matches an alias token

This is used to reduce “already present” noise caused by key mismatches.

Applied logic:

Presence features:

* `adds`: drop anything already present in `dst_full`
* `removes`: drop anything that still exists in `src_idx`
* extra safety: only remove if the key is in `prev_dst` baseline

Ratings:

* `removes` (unrates): only if not present in `src_idx`
* extra safety: only unrate if key is in `prev_dst` baseline

***

#### Step 9 — Flag gating (adds/removes)

If:

* `allow_adds == False` → `adds = []`
* `allow_removes == False` → `removes = []`

***

#### Step 10 — Mass delete protection

`maybe_block_mass_delete(removes, baseline_size=len(dst_full), allow_mass_delete=sync.allow_mass_delete, suspect_ratio=runtime.suspect_shrink_ratio)`

If mass deletes are not allowed and:

* `len(removes) > baseline_size * suspect_ratio` then removals are dropped completely and it emits:
* `mass_delete:blocked`

***

#### Step 11 — Blocklists (tombstones + unresolved + blackbox)

Only applied to **adds** and only when `feature != "watchlist"`:

```py
adds = apply_blocklist(state_store, adds, dst=dst, feature=feature, pair_key=PAIR)
```

`apply_blocklist` removes items that match any of these keys:

* pair-scoped tombstones (`tombstones.json`)
* unresolved keys (`*.unresolved.json` files)
* blackbox keys (`*.blackbox.json` files)

Notes:

* tombstones are pair-only (no global tomb list in current `_pairs_blocklist.py`)
* unresolved is loaded cross-feature by default (can block across features)
* watchlist skips this whole step; watchlist uses PhantomGuard instead (below)

***

#### Step 12 — Manual blocks

If manual blocks exist:

* they are applied to both `adds` and `removes`
* emits debug `blocked.manual`

Manual block tokens can match by:

* canonical key
* any id token `idkey:idval`
* `title:<lower>|year:<year>`

***

#### Step 13 — Unresolved filter (adds only)

The code additionally filters planned `adds` against unresolved keys:

```py
unresolved_known = load_unresolved_keys(dst, feature, cross_features=True)
adds = [it for it in adds if canonical_key(it) not in unresolved_known]
```

{% hint style="warning" %}
Heads-up: `record_unresolved()` writes `*.unresolved.pending.json`, while `load_unresolved_keys()` looks for `*.unresolved.json`.

If nothing promotes pending → active, unresolved blocking may be limited.
{% endhint %}

***

#### Step 14 — Emit the plan

`emit("one:plan", adds=..., removes=..., src_count=len(src_idx), dst_count=len(dst_full))`

***

#### Step 15 — Watchlist “phantom” guard (anti-flap, not `_blackbox.py`)

This is confusingly wired under `cfg["blackbox"]` (root-level), but it’s **PhantomGuard**:

```py
bb = cfg.get("blackbox", {})
use_phantoms = bb.enabled and bb.block_adds
ttl_days = bb.cooldown_days
guard = PhantomGuard(src, dst, feature, ttl_days=ttl_days, enabled=use_phantoms)
```

Behavior:

* blocks planned adds whose keys are known “phantoms” (previously added but didn’t stick)
* writes blocked items to:
  * `/config/.cw_state/{feature}.{src}-{dst}.{scope}.phantoms.json`
  * `/config/.cw_state/{feature}.{src}-{dst}.{scope}.last_success.json`
* emits a `blocked.counts` event for visibility

Special case: ratings “adds” are split into:

* updates (already present on dst) → not phantom-filtered
* fresh adds → phantom-filtered

***

#### Step 16 — Apply ADDs (dst writes)

If `dst` is down:

* everything in `adds` is recorded as unresolved and emits `writes:skipped op=add`.

Otherwise:

* `apply_add(...)` chunking + retries:
  * chunk size from `effective_chunk_size(ctx, dst)`
  * optional per-chunk pause: `ctx.apply_chunk_pause_ms` + rate-limit slow mode

#### Confirmation and pessimism rules

The code tries to compute “effective adds” conservatively:

Inputs:

* provider result may include:
  * `confirmed` / `count`
  * `confirmed_keys` (best case)
  * `skipped_keys`
  * `unresolved` list

If `confirmed_keys` exist:

* uses those (filtered to attempted set)

If not:

* assumes attempted minus unresolved are “confirmed keys”

If `sync.verify_after_write` is enabled **and** provider supports it (`capabilities.verify_after_write==true`):

* any key that ends up unresolved after the write is removed from `confirmed_keys`

Fallback safety:

* if provider confirms 0, and **no unresolved were produced**, it pessimistically marks the whole batch unresolved (`apply:add:no_confirmations_fallback`).

Ambiguous partial:

* if the provider response is too fuzzy (mix of skipped + partial confirmations), the code sets `added_effective = 0` and avoids poisoning blackbox/unresolved.

Blackbox + unresolved side effects:

* failed keys (not confirmed, not skipped) get:
  * `record_attempts(...)` (flap counters → possible blackbox promotion)
  * `record_unresolved(..., hint="apply:add:failed")`
* success keys get:
  * `record_success(...)` (resets flap counters)
  * `guard.record_success(...)` for PhantomGuard

After successful keys (not dry run):

* update `dst_full[k] = minimal(item)`
* bust the dst snapshot cache (`_bust_snapshot(dst)`)

***

#### Step 17 — Apply REMOVEs (dst writes)

If `dst` is down:

* records unresolved and emits `writes:skipped op=remove`.

Otherwise:

* `apply_remove(...)` chunked

After confirmed removals (not dry run):

1. **pair-scoped tombstones are written** directly into `tombstones.json`:
   * stores canonical key and every ID token present in `ids`
   * key format: `{feature}:{PAIR}|{token}` where PAIR is e.g. `PLEX-SIMKL`
2. the removed keys are popped from `dst_full`
3. bust dst snapshot cache

***

#### Step 18 — Persist baselines + checkpoints

Writes back to `state.json`:

* stores baseline `items` for **both** src and dst
* stores checkpoint for both, if present

Baseline persistence filters out items that should not be saved:

* if item has:
  * `_cw_persist == False` OR `_cw_transient == True` OR `_cw_skip_persist == True` → skipped
* if provider-specific subobject is marked ignored:
  * `item["plex"]["ignored"] == True` (or jellyfin/emby/anilist by provider map) → skipped

Then it sets:

* `state["last_sync_epoch"] = now`

***

### Config knobs that matter

Under `cfg["sync"]`:

* `enable_add` / `enable_remove`
* `drop_guard`
* `allow_mass_delete`
* `verify_after_write`
* `dry_run`
* `include_observed_deletes` (currently only affects a debug flag; no behavior in one-way)

Under `feature config` (`fcfg`):

* `enable`
* `add` / `remove`
* `libraries` (history/ratings)
* `types`, `from_date` (ratings)

Root-level `cfg["blackbox"]` (PhantomGuard, not the `_blackbox.py` module):

* `enabled`
* `block_adds`
* `cooldown_days`

`sync.blackbox` (actual blackbox module used by `record_attempts/record_success`):

* `promote_after`, `pair_scoped`, `cooldown_days`, etc.

***

### Notes

* Destination `down` still produces a plan.
* Destination `down` skips writes and records unresolved.
* `watchlist` skips `apply_blocklist`. It relies on phantom tracking instead.
* Removals are guarded by:
  * “only remove if it existed in the destination baseline”
  * optional mass-delete protection
    {% endtab %}
    {% endtabs %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://wiki.crosswatch.app/blueprint-architecture/orchestrator/one-way-sync.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
