One-way sync
Exact one-way pipeline (src → dst) used by the orchestrator for one feature run, including guardrails and state writes.
One-way sync means: read from source, write to target.
It is the safest mode.
When to use it
Use one-way when you want a clear source of truth.
Typical pattern:
Media server → tracker
One feature at a time
Recommended workflow
Enable Dry run first.
Enable Add.
Keep Remove off until you trust matching.
Expand to more features and pairs slowly.
Common “why did nothing happen?”
The pair feature is disabled.
Adds/removes are disabled.
A provider is down or auth failed.
Snapshots were coerced by drop guard (safe behavior).
Related:
Safer defaults: Best practice
Safety model: Guardrails
Where state is stored: State
One-way sync means: read from src, write to dst.
It is the safest mode. It is the recommended first setup.
Implementation notes
Primary implementation: cw_platform/orchestrator/_pairs_oneway.py
Key helpers: _snapshots.py, _planner.py, _applier.py, _pairs_blocklist.py, _pairs_massdelete.py, _unresolved.py, _blackbox.py, _phantoms.py
Related runtime docs:
Overview
Per feature, one-way does:
Gate by provider availability and health.
Build snapshots for
srcanddst.Load previous baselines and manual policy.
Apply drop-guard and per-feature filters.
Plan adds/removes (or rating upserts/unrates).
Apply guardrails (mass delete, blocklists, phantoms).
Write changes in chunks with retries.
Persist updated baselines, checkpoints, and run metadata.
Technical reference
Entry point
run_one_way_feature(ctx, src, dst, feature=..., fcfg=..., health_map=...)
Emits (high level):
feature:startplan/apply events
feature:done
Returns a summary object like:
Step 0 — Provider + health gating
Hard gates and early exits:
Missing provider ops:
If
ctx.providers[src]orctx.providers[dst]is missing →ok:false.
Auth failed:
If health status is
auth_failedfor either side → skip the pair early.
Feature unsupported:
If capability says unsupported (
capabilities.features[feature] == false) → emitfeature:unsupported.If health says feature not ok (
health.features[feature] == false) → emitfeature:unsupported.Returns
ok:truewith no ops.
Source down:
If
srcisdown→ emitwrites:skipped reason=source_downand return.
Destination down:
Planning still happens.
Writes are skipped.
Items are typically recorded as unresolved.
Flags:
allow_adds: fromfcfg.addorsync.enable_add(defaulttrue)allow_removes: fromfcfg.removeorsync.enable_remove(defaultfalse)
Step 1 — Build snapshots (src + dst)
Only the two providers are passed to snapshot builder:
Snapshot caching:
uses
ctx.snap_cache+ctx.snap_ttl_sec_bust_snapshot(dst)is called after successful writes to prevent using stale cached snapshots later in the same run
Step 2 — Load previous baselines + manual policy
Loads state.json (merged with state.manual.json via StateStore.load_state()):
prev_srcbaseline:state.providers[src][feature].baseline.itemsprev_dstbaseline:state.providers[dst][feature].baseline.items
Manual policy is read from the source provider’s state node:
manual adds: a dict of
canonical_key -> itemmanual blocks: a set of tokens (keys / ids / title-year tokens)
Manual adds are merged into src_idx later. Manual blocks are applied after blocklist filtering.
Step 3 — Drop guard (optional)
Controlled by:
sync.drop_guard(bool)runtime knobs:
runtime.suspect_min_prev(default 20)runtime.suspect_shrink_ratio(default 0.10)runtime.suspect_debug(default True)
If enabled:
computes
prev_cpfrom state andnow_cpviamodule_checkpoint(ops, cfg, feature)calls
coerce_suspect_snapshot(...)for both src and dst
Result:
eff_src,eff_dstare either the current snapshot or coerced back to baselineemits
snapshot:suspect(from_snapshots.py) + localdbg("snapshot.guard", ...)
If disabled:
eff_src = src_cur,eff_dst = dst_cur
Step 4 — Library whitelists (history/ratings only)
Library filtering applies when feature in ("history", "ratings").
Whitelist sources (in order):
feature config:
fcfg.librariescan be list, or per-provider dict (
{ "PLEX": ["1","2"] })
provider config:
cfg["plex"][feature]["libraries"](or jellyfin/emby/anilist mapping where defined)
Filtering behavior:
Looks at
library_id|libraryId|library|section_id|sectionIdon each item.Items with unknown library id are normally dropped.
Exception: Plex history sets
allow_unknown=True.
Step 5 — Index semantics: present vs delta
For each side:
Then:
if
sem == "delta":full = prev | cur(baseline merged with current delta)else:
full = eff(present snapshot, possibly coerced by drop-guard)
In code:
destination:
dst_fullsource:
src_idx
Step 6 — Feature-specific filters (ratings only)
For feature == "ratings":
types:fcfg.types(normalized tomovie|show|episode)from_date:fcfg.from_date(YYYY-MM-DD)
A rating entry is kept if:
type matches (when
typesprovided)rated_atis missing, orrated_at[:10] >= from_date
Step 7 — Planning (diff)
Manual adds (if any) are merged into the source index:
Planner:
presence features:
adds, removes = diff(src_idx, dst_full)ratings:
adds, removes = diff_ratings(src_idx, dst_full)“adds” are upserts (new rating or rating change)
“removes” are unrates (rating exists on dst but not on src)
Step 8 — Anti-duplication using typed alias tokens
Before applying:
builds alias maps (
_alias_index) using_typed_tokens(item)
Typed token rules:
movie/show:
idkey:idvalseason:
idkey:idval#season:Nepisode:
idkey:idval#sSSeEE(zero-padded)
Then _present(idx, alias, item) returns true if:
canonical key is already in the index, OR
any typed token matches an alias token
This is used to reduce “already present” noise caused by key mismatches.
Applied logic:
Presence features:
adds: drop anything already present indst_fullremoves: drop anything that still exists insrc_idxextra safety: only remove if the key is in
prev_dstbaseline
Ratings:
removes(unrates): only if not present insrc_idxextra safety: only unrate if key is in
prev_dstbaseline
Step 9 — Flag gating (adds/removes)
If:
allow_adds == False→adds = []allow_removes == False→removes = []
Step 10 — Mass delete protection
maybe_block_mass_delete(removes, baseline_size=len(dst_full), allow_mass_delete=sync.allow_mass_delete, suspect_ratio=runtime.suspect_shrink_ratio)
If mass deletes are not allowed and:
len(removes) > baseline_size * suspect_ratiothen removals are dropped completely and it emits:mass_delete:blocked
Step 11 — Blocklists (tombstones + unresolved + blackbox)
Only applied to adds and only when feature != "watchlist":
apply_blocklist removes items that match any of these keys:
pair-scoped tombstones (
tombstones.json)unresolved keys (
*.unresolved.jsonfiles)blackbox keys (
*.blackbox.jsonfiles)
Notes:
tombstones are pair-only (no global tomb list in current
_pairs_blocklist.py)unresolved is loaded cross-feature by default (can block across features)
watchlist skips this whole step; watchlist uses PhantomGuard instead (below)
Step 12 — Manual blocks
If manual blocks exist:
they are applied to both
addsandremovesemits debug
blocked.manual
Manual block tokens can match by:
canonical key
any id token
idkey:idvaltitle:<lower>|year:<year>
Step 13 — Unresolved filter (adds only)
The code additionally filters planned adds against unresolved keys:
Heads-up: record_unresolved() writes *.unresolved.pending.json, while load_unresolved_keys() looks for *.unresolved.json.
If nothing promotes pending → active, unresolved blocking may be limited.
Step 14 — Emit the plan
emit("one:plan", adds=..., removes=..., src_count=len(src_idx), dst_count=len(dst_full))
Step 15 — Watchlist “phantom” guard (anti-flap, not _blackbox.py)
_blackbox.py)This is confusingly wired under cfg["blackbox"] (root-level), but it’s PhantomGuard:
Behavior:
blocks planned adds whose keys are known “phantoms” (previously added but didn’t stick)
writes blocked items to:
/config/.cw_state/{feature}.{src}-{dst}.{scope}.phantoms.json/config/.cw_state/{feature}.{src}-{dst}.{scope}.last_success.json
emits a
blocked.countsevent for visibility
Special case: ratings “adds” are split into:
updates (already present on dst) → not phantom-filtered
fresh adds → phantom-filtered
Step 16 — Apply ADDs (dst writes)
If dst is down:
everything in
addsis recorded as unresolved and emitswrites:skipped op=add.
Otherwise:
apply_add(...)chunking + retries:chunk size from
effective_chunk_size(ctx, dst)optional per-chunk pause:
ctx.apply_chunk_pause_ms+ rate-limit slow mode
Confirmation and pessimism rules
The code tries to compute “effective adds” conservatively:
Inputs:
provider result may include:
confirmed/countconfirmed_keys(best case)skipped_keysunresolvedlist
If confirmed_keys exist:
uses those (filtered to attempted set)
If not:
assumes attempted minus unresolved are “confirmed keys”
If sync.verify_after_write is enabled and provider supports it (capabilities.verify_after_write==true):
any key that ends up unresolved after the write is removed from
confirmed_keys
Fallback safety:
if provider confirms 0, and no unresolved were produced, it pessimistically marks the whole batch unresolved (
apply:add:no_confirmations_fallback).
Ambiguous partial:
if the provider response is too fuzzy (mix of skipped + partial confirmations), the code sets
added_effective = 0and avoids poisoning blackbox/unresolved.
Blackbox + unresolved side effects:
failed keys (not confirmed, not skipped) get:
record_attempts(...)(flap counters → possible blackbox promotion)record_unresolved(..., hint="apply:add:failed")
success keys get:
record_success(...)(resets flap counters)guard.record_success(...)for PhantomGuard
After successful keys (not dry run):
update
dst_full[k] = minimal(item)bust the dst snapshot cache (
_bust_snapshot(dst))
Step 17 — Apply REMOVEs (dst writes)
If dst is down:
records unresolved and emits
writes:skipped op=remove.
Otherwise:
apply_remove(...)chunked
After confirmed removals (not dry run):
pair-scoped tombstones are written directly into
tombstones.json:stores canonical key and every ID token present in
idskey format:
{feature}:{PAIR}|{token}where PAIR is e.g.PLEX-SIMKL
the removed keys are popped from
dst_fullbust dst snapshot cache
Step 18 — Persist baselines + checkpoints
Writes back to state.json:
stores baseline
itemsfor both src and dststores checkpoint for both, if present
Baseline persistence filters out items that should not be saved:
if item has:
_cw_persist == FalseOR_cw_transient == TrueOR_cw_skip_persist == True→ skipped
if provider-specific subobject is marked ignored:
item["plex"]["ignored"] == True(or jellyfin/emby/anilist by provider map) → skipped
Then it sets:
state["last_sync_epoch"] = now
Config knobs that matter
Under cfg["sync"]:
enable_add/enable_removedrop_guardallow_mass_deleteverify_after_writedry_runinclude_observed_deletes(currently only affects a debug flag; no behavior in one-way)
Under feature config (fcfg):
enableadd/removelibraries(history/ratings)types,from_date(ratings)
Root-level cfg["blackbox"] (PhantomGuard, not the _blackbox.py module):
enabledblock_addscooldown_days
sync.blackbox (actual blackbox module used by record_attempts/record_success):
promote_after,pair_scoped,cooldown_days, etc.
Notes
Destination
downstill produces a plan.Destination
downskips writes and records unresolved.watchlistskipsapply_blocklist. It relies on phantom tracking instead.Removals are guarded by:
“only remove if it existed in the destination baseline”
optional mass-delete protection
Last updated