Skip to content

init

production ¤

Production Events and Tracking

This module contains both event detection and daily tracking tools for production:

Event Detection Classes: - MachineStateEvents: Run/idle intervals and transition points from a boolean state signal. - detect_run_idle: Intervalize run/idle with optional min duration. - transition_events: Point events on idle→run and run→idle changes.

  • LineThroughputEvents: Throughput metrics and takt adherence.
  • count_parts: Parts per fixed window from a counter uuid.
  • takt_adherence: Cycle time violations vs. a takt time.

  • ChangeoverEvents: Product/recipe changes and end-of-changeover derivation.

  • detect_changeover: Point events at product value changes.
  • changeover_window: End via fixed window or stable band metrics.

  • FlowConstraintEvents: Blocked/starved intervals between upstream/downstream run signals.

  • blocked_events: Upstream running while downstream not consuming.
  • starved_events: Downstream running while upstream not supplying.

Daily Production Tracking Classes: - PartProductionTracking: Track production quantities by part number. - production_by_part: Production quantity per time window. - daily_production_summary: Daily totals by part. - production_totals: Totals over date ranges.

  • CycleTimeTracking: Analyze cycle times by part number.
  • cycle_time_by_part: Calculate cycle times.
  • cycle_time_statistics: Statistical analysis (min/avg/max/std).
  • detect_slow_cycles: Anomaly detection.
  • cycle_time_trend: Trend analysis.

  • ShiftReporting: Shift-based performance analysis.

  • shift_production: Production per shift.
  • shift_comparison: Compare shift performance.
  • shift_targets: Target vs actual analysis.
  • best_and_worst_shifts: Performance ranking.

  • DowntimeTracking: Machine availability and downtime analysis.

  • downtime_by_shift: Downtime and availability per shift.
  • downtime_by_reason: Root cause analysis.
  • top_downtime_reasons: Pareto analysis (80/20 rule).
  • availability_trend: Track availability over time.

  • QualityTracking: NOK (defective parts) and quality metrics.

  • nok_by_shift: NOK parts and First Pass Yield per shift.
  • quality_by_part: Quality metrics by part number.
  • nok_by_reason: Defect type analysis.
  • daily_quality_summary: Daily quality rollup.

OEE and Advanced Analytics: - OEECalculator: Overall Equipment Effectiveness (Availability x Performance x Quality). - calculate_availability: Availability % from run/idle intervals. - calculate_performance: Actual vs ideal throughput. - calculate_quality: Good parts / total parts. - calculate_oee: Combined daily OEE metric.

  • AlarmManagementEvents: ISA-18.2 style alarm analysis.
  • alarm_frequency: Alarm activations per time window.
  • alarm_duration_stats: Min/avg/max/total duration of alarm ON states.
  • chattering_detection: Detect nuisance chattering alarms.
  • standing_alarms: Identify alarms that stay active too long.

  • BatchTrackingEvents: Batch/recipe production tracking.

  • detect_batches: Detect batch start/end from value changes.
  • batch_duration_stats: Duration statistics per batch type.
  • batch_yield: Production quantity per batch.
  • batch_transition_matrix: Batch-to-batch transition frequencies.

  • BottleneckDetectionEvents: Identify production line bottlenecks.

  • station_utilization: Per-station uptime percentage per window.
  • detect_bottleneck: Identify bottleneck station per window.
  • shifting_bottleneck: Track when the bottleneck moves.
  • throughput_constraint_summary: Summary statistics.

  • MicroStopEvents: Detect brief idle intervals that accumulate into losses.

  • detect_micro_stops: Find idle intervals shorter than max_duration.
  • micro_stop_frequency: Count micro-stops per window.
  • micro_stop_impact: Time lost to micro-stops per window.
  • micro_stop_patterns: Group micro-stops by hour-of-day.

  • DutyCycleEvents: Analyze on/off patterns from boolean signals.

  • duty_cycle_per_window: On-time percentage per window.
  • on_off_intervals: List every on/off interval with duration.
  • cycle_count: Transition counts per window.
  • excessive_cycling: Flag windows with too many transitions.

Traceability: - ValueTraceabilityEvents: Trace any shared identifier across multiple stations. - build_timeline: Full timeline of every identifier at every station. - lead_time: End-to-end lead time per identifier. - current_status: Last-known station for each identifier. - station_dwell_statistics: Dwell-time stats per station. (OrderTraceabilityEvents is a backwards-compatible alias.)

  • RoutingTraceabilityEvents: Trace item routing using ID + state/routing signal.
  • state_map: Maps signal values (PLC steps, station codes) to station names.
  • build_routing_timeline: Correlate ID signal with state signal.
  • lead_time: End-to-end lead time per item.
  • station_statistics: Dwell-time stats per station/step.
  • routing_paths: Most common routing path sequences.

  • MultiProcessTraceabilityEvents: Multi-station topology with parallel paths and handovers.

  • build_timeline: Full timeline with parallel-station awareness.
  • lead_time: End-to-end lead time with parallel flag.
  • parallel_activity: Detect overlapping station intervals per item.
  • handover_log: Extract and correlate handover events with item IDs.
  • station_statistics: Per-station/cell dwell-time stats.
  • routing_paths: Path frequency analysis with parallel flag.

Setup, Rework, and Operator Tracking: - SetupTimeTracking: Analyze changeover/setup durations for SMED improvement. - setup_durations: List every setup interval with duration. - setup_by_product: Setup time by product transition (from → to). - setup_statistics: Overall setup time stats (count, avg, median, std, % of available time). - setup_trend: Track setup time improvement over time.

  • OperatorPerformanceTracking: Compare production output and quality across operators.
  • production_by_operator: Parts produced per operator.
  • operator_efficiency: Operator efficiency vs per-shift target.
  • quality_by_operator: Quality metrics (FPY) per operator.
  • operator_comparison: Ranked operator performance comparison.

  • ReworkTracking: Track parts requiring rework (re-processing).

  • rework_by_shift: Rework count per shift.
  • rework_by_reason: Rework by reason code.
  • rework_rate: Rework rate as % of total production.
  • rework_cost: Convert rework counts to monetary cost.
  • rework_trend: Rework trend over time.

Performance and Target Tracking: - PerformanceLossTracking: Track speed losses against target cycle times. - performance_by_shift: Performance % per shift. - slow_periods: Identify windows below target performance. - performance_trend: Performance trend over time.

  • ScrapTracking: Track material scrap and waste.
  • scrap_by_shift: Scrap quantity per shift.
  • scrap_by_reason: Scrap by reason code.
  • scrap_cost: Convert scrap to monetary cost.
  • scrap_trend: Scrap trend over time.

  • TargetTracking: Compare any metric to targets.

  • compare_to_target: Actual vs per-shift targets.
  • target_achievement_summary: Daily target achievement.
  • target_hit_rate: Percentage of days meeting targets.

  • ShiftHandoverReport: Automated shift handover report generation.

  • generate_report: Full shift report with production, quality, downtime.
  • highlight_issues: Auto-identify metrics below thresholds.

  • PeriodSummary: Weekly/monthly summary aggregation.

  • weekly_summary: Roll up daily metrics to weekly.
  • monthly_summary: Roll up daily metrics to monthly.
  • compare_periods: Period-over-period comparison.

MachineStateEvents ¤

MachineStateEvents(
    dataframe: DataFrame,
    run_state_uuid: str,
    *,
    event_uuid: str = "prod:run_idle",
    value_column: str = "value_bool",
    time_column: str = "systime"
)

Bases: Base

Production: Machine State

Detect run/idle transitions and intervals from a boolean state signal.

  • MachineStateEvents: Run/idle state intervals and transitions.
  • detect_run_idle: Intervalize run/idle states with optional min duration filter.
  • transition_events: Point events on state changes (idle->run, run->idle).

detect_run_idle ¤

detect_run_idle(min_duration: str = '0s') -> pd.DataFrame

Return intervals labeled as 'run' or 'idle'.

  • min_duration: discard intervals shorter than this duration. Columns: start, end, uuid, source_uuid, is_delta, state, duration_seconds

transition_events ¤

transition_events() -> pd.DataFrame

Return point events at state transitions.

Columns: systime, uuid, source_uuid, is_delta, transition ('idle_to_run'|'run_to_idle'), time_since_last_transition_seconds

detect_rapid_transitions ¤

detect_rapid_transitions(
    threshold: str = "5s", min_count: int = 3
) -> pd.DataFrame

Identify suspicious rapid state changes.

  • threshold: time window to look for rapid transitions
  • min_count: minimum number of transitions within threshold to be considered rapid Returns: DataFrame with start_time, end_time, transition_count, duration_seconds

state_quality_metrics ¤

state_quality_metrics() -> Dict[str, Any]

Return quality metrics for the state data.

Returns dictionary with: - total_transitions: total number of state transitions - avg_run_duration: average duration of run states in seconds - avg_idle_duration: average duration of idle states in seconds - run_idle_ratio: ratio of run time to idle time - data_gaps_detected: number of data gaps found - rapid_transitions_detected: number of rapid transition events

LineThroughputEvents ¤

LineThroughputEvents(
    dataframe: DataFrame,
    *,
    event_uuid: str = "prod:throughput",
    time_column: str = "systime"
)

Bases: Base

Production: Line Throughput

Methods: - count_parts: Part counts per fixed window from a monotonically increasing counter. - takt_adherence: Cycle time violations against a takt time from step/boolean triggers.

count_parts ¤

count_parts(
    counter_uuid: str,
    *,
    value_column: str = "value_integer",
    window: str = "1m"
) -> pd.DataFrame

Compute parts per window for a counter uuid.

Returns columns: window_start, uuid, source_uuid, is_delta, count

takt_adherence ¤

takt_adherence(
    cycle_uuid: str,
    *,
    value_column: str = "value_bool",
    takt_time: str = "60s",
    min_violation: str = "0s"
) -> pd.DataFrame

Flag cycles whose durations exceed the takt_time.

For boolean triggers: detect True rising edges as cycle boundaries. For integer steps: detect increments as cycle boundaries.

Returns: systime (at boundary), uuid, source_uuid, is_delta, cycle_time_seconds, violation

throughput_oee ¤

throughput_oee(
    counter_uuid: str,
    *,
    value_column: str = "value_integer",
    window: str = "1h",
    target_rate: Optional[float] = None,
    availability_threshold: float = 0.95
) -> pd.DataFrame

Calculate Overall Equipment Effectiveness (OEE) metrics.

OEE = Availability × Performance × Quality

Parameters:

Name Type Description Default
counter_uuid str

UUID for the part counter signal

required
value_column str

Column containing counter values

'value_integer'
window str

Time window for aggregation

'1h'
target_rate Optional[float]

Target production rate (parts per window). If None, uses max observed

None
availability_threshold float

Threshold for considering equipment available

0.95

Returns:

Type Description
DataFrame

DataFrame with columns: window_start, uuid, source_uuid, is_delta,

DataFrame

actual_count, target_count, availability, performance, oee_score

throughput_trends(
    counter_uuid: str,
    *,
    value_column: str = "value_integer",
    window: str = "1h",
    trend_window: int = 24
) -> pd.DataFrame

Analyze throughput trends with moving averages and degradation detection.

Parameters:

Name Type Description Default
counter_uuid str

UUID for the part counter signal

required
value_column str

Column containing counter values

'value_integer'
window str

Time window for counting parts

'1h'
trend_window int

Number of windows for trend calculation

24

Returns:

Type Description
DataFrame

DataFrame with throughput, moving average, trend direction, and degradation flag

cycle_quality_check ¤

cycle_quality_check(
    cycle_uuid: str,
    *,
    value_column: str = "value_bool",
    expected_cycle_time: Optional[float] = None,
    tolerance_pct: float = 0.1
) -> pd.DataFrame

Enhanced cycle detection with quality validation.

Parameters:

Name Type Description Default
cycle_uuid str

UUID for the cycle trigger signal

required
value_column str

Column containing cycle trigger (bool/integer)

'value_bool'
expected_cycle_time Optional[float]

Expected cycle time in seconds. If None, uses median

None
tolerance_pct float

Tolerance percentage for cycle time validation

0.1

Returns:

Type Description
DataFrame

DataFrame with cycle times, validation status, and quality flags

ChangeoverEvents ¤

ChangeoverEvents(
    dataframe: DataFrame,
    *,
    event_uuid: str = "prod:changeover",
    time_column: str = "systime"
)

Bases: Base

Production: Changeover

Detect product/recipe changes and compute changeover windows without requiring a dedicated 'first good' signal.

Methods: - detect_changeover: point events when product/recipe changes. - changeover_window: derive an end time via fixed window or 'stable_band' metrics.

detect_changeover ¤

detect_changeover(
    product_uuid: str,
    *,
    value_column: str = "value_string",
    min_hold: str = "0s"
) -> pd.DataFrame

Emit point events when the product/recipe changes value.

Uses a hold check: the new product must persist for at least min_hold until the next change.

changeover_window ¤

changeover_window(
    product_uuid: str,
    *,
    value_column: str = "value_string",
    start_time: Optional[Timestamp] = None,
    until: str = "fixed_window",
    config: Optional[Dict[str, Any]] = None,
    fallback: Optional[Dict[str, Any]] = None
) -> pd.DataFrame

Compute changeover windows per product change with enhanced configurability.

until
  • fixed_window: end = start + config['duration'] (e.g., '10m')
  • stable_band: end when all metrics stabilize within band for hold: config = { 'metrics': [ {'uuid': 'm1', 'value_column': 'value_double', 'band': 0.2, 'hold': '2m'}, ... ], 'reference_method': 'expanding_median' | 'rolling_mean' | 'ewma' | 'target_value', 'rolling_window': 5, # for rolling_mean (number of points) 'ewma_span': 10, # for ewma 'target_values': {'m1': 100.0, ...} # for target_value }

fallback: {'default_duration': '10m', 'completed': False}

changeover_quality_metrics ¤

changeover_quality_metrics(
    product_uuid: str, *, value_column: str = "value_string"
) -> pd.DataFrame

Compute quality metrics for changeovers.

Returns metrics including: - changeover duration patterns - frequency statistics - time between changeovers - product-specific metrics

FlowConstraintEvents ¤

FlowConstraintEvents(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    event_uuid: str = "prod:flow"
)

Bases: Base

Production: Flow Constraints

  • blocked_events: upstream running while downstream not consuming.
  • starved_events: downstream idle due to lack of upstream supply.

blocked_events ¤

blocked_events(
    *,
    roles: Dict[str, str],
    tolerance: str = "200ms",
    tolerance_before: Optional[str] = None,
    tolerance_after: Optional[str] = None,
    min_duration: str = "0s"
) -> pd.DataFrame

Blocked: upstream_run=True while downstream_run=False.

Parameters:

Name Type Description Default
roles Dict[str, str]

Dictionary mapping role names to UUIDs. Expected keys: 'upstream_run', 'downstream_run'

required
tolerance str

Default tolerance for time alignment (used if directional tolerances not provided)

'200ms'
tolerance_before Optional[str]

Tolerance for looking backward in time during alignment

None
tolerance_after Optional[str]

Tolerance for looking forward in time during alignment

None
min_duration str

Minimum duration for an event to be included

'0s'

Returns:

Type Description
DataFrame

DataFrame with columns: start, end, uuid, source_uuid, is_delta, type,

DataFrame

time_alignment_quality, duration, severity

Example

roles = {'upstream_run': 'uuid1', 'downstream_run': 'uuid2'}

starved_events ¤

starved_events(
    *,
    roles: Dict[str, str],
    tolerance: str = "200ms",
    tolerance_before: Optional[str] = None,
    tolerance_after: Optional[str] = None,
    min_duration: str = "0s"
) -> pd.DataFrame

Starved: downstream_run=True while upstream_run=False.

Parameters:

Name Type Description Default
roles Dict[str, str]

Dictionary mapping role names to UUIDs. Expected keys: 'upstream_run', 'downstream_run'

required
tolerance str

Default tolerance for time alignment (used if directional tolerances not provided)

'200ms'
tolerance_before Optional[str]

Tolerance for looking backward in time during alignment

None
tolerance_after Optional[str]

Tolerance for looking forward in time during alignment

None
min_duration str

Minimum duration for an event to be included

'0s'

Returns:

Type Description
DataFrame

DataFrame with columns: start, end, uuid, source_uuid, is_delta, type,

DataFrame

time_alignment_quality, duration, severity

Example

roles = {'upstream_run': 'uuid1', 'downstream_run': 'uuid2'}

flow_constraint_analytics ¤

flow_constraint_analytics(
    *,
    roles: Dict[str, str],
    tolerance: str = "200ms",
    tolerance_before: Optional[str] = None,
    tolerance_after: Optional[str] = None,
    min_duration: str = "0s",
    minor_threshold: str = "5s",
    moderate_threshold: str = "30s"
) -> Dict[str, Any]

Generate comprehensive analytics for flow constraints (blockages and starvations).

Parameters:

Name Type Description Default
roles Dict[str, str]

Dictionary mapping role names to UUIDs. Expected keys: 'upstream_run', 'downstream_run'

required
tolerance str

Default tolerance for time alignment (used if directional tolerances not provided)

'200ms'
tolerance_before Optional[str]

Tolerance for looking backward in time during alignment

None
tolerance_after Optional[str]

Tolerance for looking forward in time during alignment

None
min_duration str

Minimum duration for an event to be included

'0s'
minor_threshold str

Duration threshold for minor severity classification

'5s'
moderate_threshold str

Duration threshold for moderate severity classification

'30s'

Returns:

Type Description
Dict[str, Any]

Dictionary containing analytics for both blocked and starved events:

Dict[str, Any]
  • blocked_events: DataFrame of blocked events
Dict[str, Any]
  • starved_events: DataFrame of starved events
Dict[str, Any]
  • summary: Dictionary with statistics including:
  • blocked_count: Total number of blocked events
  • starved_count: Total number of starved events
  • blocked_total_duration: Total duration of blocked events
  • starved_total_duration: Total duration of starved events
  • blocked_avg_duration: Average duration of blocked events
  • starved_avg_duration: Average duration of starved events
  • blocked_severity_breakdown: Count by severity level
  • starved_severity_breakdown: Count by severity level
  • overall_alignment_quality: Average alignment quality across both types
Example

roles = {'upstream_run': 'uuid1', 'downstream_run': 'uuid2'} analytics = flow.flow_constraint_analytics(roles=roles) print(analytics['summary']['blocked_count'])

PartProductionTracking ¤

PartProductionTracking(
    dataframe: DataFrame, *, time_column: str = "systime"
)

Bases: Base

Track production quantities by part number.

Each UUID represents one signal: - part_id_uuid: string signal with current part number - counter_uuid: monotonic counter for production count

Example usage

tracker = PartProductionTracking(df)

Hourly production by part¤

hourly = tracker.production_by_part( part_id_uuid='part_number_signal', counter_uuid='counter_signal', window='1h' )

Daily summary¤

daily = tracker.daily_production_summary( part_id_uuid='part_number_signal', counter_uuid='counter_signal' )

Initialize part production tracker.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data

required
time_column str

Name of timestamp column (default: 'systime')

'systime'

production_by_part ¤

production_by_part(
    part_id_uuid: str,
    counter_uuid: str,
    *,
    window: str = "1h",
    value_column_part: str = "value_string",
    value_column_counter: str = "value_integer"
) -> pd.DataFrame

Calculate production quantity per part number.

Parameters:

Name Type Description Default
part_id_uuid str

UUID for part number signal

required
counter_uuid str

UUID for production counter

required
window str

Time window for aggregation (e.g., '1h', '8h', '1d')

'1h'
value_column_part str

Column containing part numbers

'value_string'
value_column_counter str

Column containing counter values

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • window_start: Start of time window
DataFrame
  • part_number: Part number/ID
DataFrame
  • quantity: Parts produced in window
DataFrame
  • first_count: Counter value at window start
DataFrame
  • last_count: Counter value at window end
Example

production_by_part('part_id', 'counter', window='1h') window_start part_number quantity first_count last_count 0 2024-01-01 08:00:00 PART_A 150 1000 1150 1 2024-01-01 09:00:00 PART_A 145 1150 1295 2 2024-01-01 10:00:00 PART_B 98 1295 1393

daily_production_summary ¤

daily_production_summary(
    part_id_uuid: str,
    counter_uuid: str,
    *,
    value_column_part: str = "value_string",
    value_column_counter: str = "value_integer"
) -> pd.DataFrame

Daily production summary by part number.

Parameters:

Name Type Description Default
part_id_uuid str

UUID for part number signal

required
counter_uuid str

UUID for production counter

required
value_column_part str

Column containing part numbers

'value_string'
value_column_counter str

Column containing counter values

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date: Production date
DataFrame
  • part_number: Part number/ID
DataFrame
  • total_quantity: Total parts produced that day
DataFrame
  • hours_active: Number of hours with production
Example

daily_production_summary('part_id', 'counter') date part_number total_quantity hours_active 0 2024-01-01 PART_A 1200 8 1 2024-01-01 PART_B 850 6 2 2024-01-02 PART_A 1150 8

production_totals ¤

production_totals(
    part_id_uuid: str,
    counter_uuid: str,
    *,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    value_column_part: str = "value_string",
    value_column_counter: str = "value_integer"
) -> pd.DataFrame

Total production by part number for a date range.

Parameters:

Name Type Description Default
part_id_uuid str

UUID for part number signal

required
counter_uuid str

UUID for production counter

required
start_date Optional[str]

Start date 'YYYY-MM-DD' (optional)

None
end_date Optional[str]

End date 'YYYY-MM-DD' (optional)

None
value_column_part str

Column containing part numbers

'value_string'
value_column_counter str

Column containing counter values

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with total production per part

Example

production_totals('part_id', 'counter', ... start_date='2024-01-01', end_date='2024-01-07') part_number total_quantity days_produced 0 PART_A 8450 5 1 PART_B 6200 4

CycleTimeTracking ¤

CycleTimeTracking(
    dataframe: DataFrame, *, time_column: str = "systime"
)

Bases: Base

Track cycle times by part number.

Each UUID represents one signal: - part_id_uuid: string signal with current part number - cycle_trigger_uuid: boolean/integer signal for cycle completion

Example usage

tracker = CycleTimeTracking(df)

Get cycle times by part¤

cycles = tracker.cycle_time_by_part( part_id_uuid='part_number_signal', cycle_trigger_uuid='cycle_complete_signal' )

Get statistics¤

stats = tracker.cycle_time_statistics( part_id_uuid='part_number_signal', cycle_trigger_uuid='cycle_complete_signal' )

Initialize cycle time tracker.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data

required
time_column str

Name of timestamp column (default: 'systime')

'systime'

cycle_time_by_part ¤

cycle_time_by_part(
    part_id_uuid: str,
    cycle_trigger_uuid: str,
    *,
    value_column_part: str = "value_string",
    value_column_trigger: str = "value_bool"
) -> pd.DataFrame

Calculate cycle time for each part number.

Parameters:

Name Type Description Default
part_id_uuid str

UUID for part number signal

required
cycle_trigger_uuid str

UUID for cycle completion trigger

required
value_column_part str

Column containing part numbers

'value_string'
value_column_trigger str

Column containing cycle triggers

'value_bool'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • systime: Cycle completion time
DataFrame
  • part_number: Part number/ID
DataFrame
  • cycle_time_seconds: Cycle time in seconds
Example

cycle_time_by_part('part_id', 'cycle_trigger') systime part_number cycle_time_seconds 0 2024-01-01 08:05:30 PART_A 45.2 1 2024-01-01 08:06:18 PART_A 48.0 2 2024-01-01 08:07:05 PART_A 47.1

cycle_time_statistics ¤

cycle_time_statistics(
    part_id_uuid: str,
    cycle_trigger_uuid: str,
    *,
    value_column_part: str = "value_string",
    value_column_trigger: str = "value_bool"
) -> pd.DataFrame

Calculate statistics: min, avg, max, std cycle time by part.

Parameters:

Name Type Description Default
part_id_uuid str

UUID for part number signal

required
cycle_trigger_uuid str

UUID for cycle completion trigger

required
value_column_part str

Column containing part numbers

'value_string'
value_column_trigger str

Column containing cycle triggers

'value_bool'

Returns:

Type Description
DataFrame

DataFrame with statistics per part:

DataFrame
  • part_number: Part number/ID
DataFrame
  • count: Number of cycles
DataFrame
  • min_seconds: Minimum cycle time
DataFrame
  • avg_seconds: Average cycle time
DataFrame
  • max_seconds: Maximum cycle time
DataFrame
  • std_seconds: Standard deviation
DataFrame
  • median_seconds: Median cycle time (robust target)
Example

cycle_time_statistics('part_id', 'cycle_trigger') part_number count min_seconds avg_seconds max_seconds std_seconds median_seconds 0 PART_A 450 42.1 47.5 58.2 3.2 47.1 1 PART_B 320 55.0 62.8 78.5 5.1 61.9

detect_slow_cycles ¤

detect_slow_cycles(
    part_id_uuid: str,
    cycle_trigger_uuid: str,
    *,
    threshold_factor: float = 1.5,
    value_column_part: str = "value_string",
    value_column_trigger: str = "value_bool"
) -> pd.DataFrame

Identify cycles that exceed normal time by threshold factor.

Parameters:

Name Type Description Default
part_id_uuid str

UUID for part number signal

required
cycle_trigger_uuid str

UUID for cycle completion trigger

required
threshold_factor float

Cycles slower than median * factor are flagged (default: 1.5)

1.5
value_column_part str

Column containing part numbers

'value_string'
value_column_trigger str

Column containing cycle triggers

'value_bool'

Returns:

Type Description
DataFrame

DataFrame with slow cycles:

DataFrame
  • systime: Cycle completion time
DataFrame
  • part_number: Part number/ID
DataFrame
  • cycle_time_seconds: Actual cycle time
DataFrame
  • median_seconds: Expected (median) cycle time
DataFrame
  • deviation_factor: How many times slower than expected
DataFrame
  • is_slow: Flag (always True in returned data)
Example

detect_slow_cycles('part_id', 'cycle_trigger', threshold_factor=1.5) systime part_number cycle_time_seconds median_seconds deviation_factor is_slow 0 2024-01-01 10:15:30 PART_A 75.2 47.1 1.60 True 1 2024-01-01 14:22:18 PART_A 82.5 47.1 1.75 True

cycle_time_trend ¤

cycle_time_trend(
    part_id_uuid: str,
    cycle_trigger_uuid: str,
    part_number: str,
    *,
    window_size: int = 20,
    value_column_part: str = "value_string",
    value_column_trigger: str = "value_bool"
) -> pd.DataFrame

Analyze cycle time trends for a specific part.

Parameters:

Name Type Description Default
part_id_uuid str

UUID for part number signal

required
cycle_trigger_uuid str

UUID for cycle completion trigger

required
part_number str

Specific part number to analyze

required
window_size int

Number of cycles for moving average (default: 20)

20
value_column_part str

Column containing part numbers

'value_string'
value_column_trigger str

Column containing cycle triggers

'value_bool'

Returns:

Type Description
DataFrame

DataFrame with trend data:

DataFrame
  • systime: Cycle completion time
DataFrame
  • cycle_time_seconds: Actual cycle time
DataFrame
  • moving_avg: Moving average cycle time
DataFrame
  • trend: 'improving', 'stable', or 'degrading'
Example

cycle_time_trend('part_id', 'cycle_trigger', 'PART_A') systime cycle_time_seconds moving_avg trend 0 2024-01-01 08:05:30 45.2 47.1 improving 1 2024-01-01 08:06:18 48.0 47.2 stable 2 2024-01-01 08:07:05 47.1 47.1 stable

hourly_cycle_time_summary ¤

hourly_cycle_time_summary(
    part_id_uuid: str,
    cycle_trigger_uuid: str,
    *,
    value_column_part: str = "value_string",
    value_column_trigger: str = "value_bool"
) -> pd.DataFrame

Hourly summary of cycle times by part.

Parameters:

Name Type Description Default
part_id_uuid str

UUID for part number signal

required
cycle_trigger_uuid str

UUID for cycle completion trigger

required
value_column_part str

Column containing part numbers

'value_string'
value_column_trigger str

Column containing cycle triggers

'value_bool'

Returns:

Type Description
DataFrame

DataFrame with hourly statistics:

DataFrame
  • hour: Hour timestamp
DataFrame
  • part_number: Part number/ID
DataFrame
  • cycles_completed: Number of cycles
DataFrame
  • avg_cycle_time: Average cycle time
DataFrame
  • min_cycle_time: Fastest cycle
DataFrame
  • max_cycle_time: Slowest cycle
Example

hourly_cycle_time_summary('part_id', 'cycle_trigger') hour part_number cycles_completed avg_cycle_time min_cycle_time max_cycle_time 0 2024-01-01 08:00:00 PART_A 75 47.2 42.1 55.8 1 2024-01-01 09:00:00 PART_A 78 46.8 43.0 52.3

ShiftReporting ¤

ShiftReporting(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, Tuple[str, str]]
    ] = None
)

Bases: Base

Simple shift-based production reporting.

Each UUID represents one signal: - counter_uuid: production counter - part_id_uuid: part number (optional)

Merge keys: [date, shift] for shift-level outputs.

Pipeline example::

reporter = ShiftReporting(df)
prod = reporter.shift_production('counter')
# → merge with QualityTracking.nok_by_shift() on [date, shift]
# → merge with DowntimeTracking.downtime_by_shift() on [date, shift]
# → feed combined DataFrame into ShiftHandoverReport.from_shift_data()
Example usage

reporter = ShiftReporting(df, shift_definitions={ "day": ("06:00", "14:00"), "afternoon": ("14:00", "22:00"), "night": ("22:00", "06:00"), })

Production per shift¤

shift_prod = reporter.shift_production( counter_uuid='counter_signal', part_id_uuid='part_number_signal' )

Compare shifts¤

comparison = reporter.shift_comparison(counter_uuid='counter_signal')

Initialize shift reporter.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data

required
time_column str

Name of timestamp column (default: 'systime')

'systime'
shift_definitions Optional[Dict[str, Tuple[str, str]]]

Dictionary mapping shift names to (start, end) times Default: 3-shift operation (06:00-14:00, 14:00-22:00, 22:00-06:00)

None
Example shift_definitions

{ "shift_1": ("06:00", "14:00"), "shift_2": ("14:00", "22:00"), "shift_3": ("22:00", "06:00"), }

shift_production ¤

shift_production(
    counter_uuid: str,
    part_id_uuid: Optional[str] = None,
    *,
    value_column_counter: str = "value_integer",
    value_column_part: str = "value_string",
    date: Optional[str] = None
) -> pd.DataFrame

Production quantity per shift.

Parameters:

Name Type Description Default
counter_uuid str

Production counter UUID

required
part_id_uuid Optional[str]

Part number UUID (optional, for part-specific production)

None
value_column_counter str

Column containing counter values

'value_integer'
value_column_part str

Column containing part numbers

'value_string'
date Optional[str]

Specific date in 'YYYY-MM-DD' format (optional)

None

Returns:

Type Description
DataFrame

DataFrame with production by shift:

DataFrame
  • date: Production date
DataFrame
  • shift: Shift name
DataFrame
  • part_number: Part number (if part_id_uuid provided)
DataFrame
  • quantity: Parts produced
Example

shift_production('counter', part_id_uuid='part_id') date shift part_number quantity 0 2024-01-01 shift_1 PART_A 450 1 2024-01-01 shift_2 PART_A 425 2 2024-01-01 shift_3 PART_A 380

shift_comparison ¤

shift_comparison(
    counter_uuid: str,
    *,
    value_column_counter: str = "value_integer",
    days: int = 7
) -> pd.DataFrame

Compare shift performance over recent days.

Parameters:

Name Type Description Default
counter_uuid str

Production counter UUID

required
value_column_counter str

Column containing counter values

'value_integer'
days int

Number of recent days to analyze (default: 7)

7

Returns:

Type Description
DataFrame

DataFrame with shift comparison:

DataFrame
  • shift: Shift name
DataFrame
  • avg_quantity: Average production per shift
DataFrame
  • min_quantity: Minimum production
DataFrame
  • max_quantity: Maximum production
DataFrame
  • std_quantity: Standard deviation
DataFrame
  • days_count: Number of days included
Example

shift_comparison('counter', days=7) shift avg_quantity min_quantity max_quantity std_quantity days_count 0 shift_1 445 420 465 15.2 7 1 shift_2 430 405 450 12.8 7 2 shift_3 385 360 410 18.5 7

shift_targets ¤

shift_targets(
    counter_uuid: str,
    targets: Dict[str, float],
    *,
    value_column_counter: str = "value_integer",
    date: Optional[str] = None
) -> pd.DataFrame

Compare actual production to shift targets.

Parameters:

Name Type Description Default
counter_uuid str

Production counter UUID

required
targets Dict[str, float]

Dictionary mapping shift names to target quantities

required
value_column_counter str

Column containing counter values

'value_integer'
date Optional[str]

Specific date in 'YYYY-MM-DD' format (optional)

None

Returns:

Type Description
DataFrame

DataFrame with target comparison:

DataFrame
  • date: Production date
DataFrame
  • shift: Shift name
DataFrame
  • actual: Actual production
DataFrame
  • target: Target production
DataFrame
  • variance: Difference (actual - target)
DataFrame
  • achievement_pct: Percentage of target achieved
Example

shift_targets('counter', targets={'shift_1': 450, 'shift_2': 450, 'shift_3': 400}) date shift actual target variance achievement_pct 0 2024-01-01 shift_1 445 450 -5 98.9 1 2024-01-01 shift_2 465 450 15 103.3 2 2024-01-01 shift_3 390 400 -10 97.5

best_and_worst_shifts ¤

best_and_worst_shifts(
    counter_uuid: str,
    *,
    value_column_counter: str = "value_integer",
    days: int = 30
) -> Dict[str, pd.DataFrame]

Identify best and worst performing shifts.

Parameters:

Name Type Description Default
counter_uuid str

Production counter UUID

required
value_column_counter str

Column containing counter values

'value_integer'
days int

Number of recent days to analyze (default: 30)

30

Returns:

Type Description
Dict[str, DataFrame]

Dictionary with:

Dict[str, DataFrame]
  • 'best': Top 5 best shifts
Dict[str, DataFrame]
  • 'worst': Top 5 worst shifts
Example

results = best_and_worst_shifts('counter') results['best'] date shift quantity 0 2024-01-15 shift_2 495 1 2024-01-18 shift_1 490 2 2024-01-22 shift_2 485

DowntimeTracking ¤

DowntimeTracking(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Track machine downtimes by shift and reason.

Each UUID represents one signal: - state_uuid: machine state (running/stopped/idle) - reason_uuid: downtime reason code (optional)

Merge keys: [date, shift] for shift-level, [period] for trend data.

Pipeline example::

downtime = DowntimeTracking(df)
shift_dt = downtime.downtime_by_shift('machine_state')
# → merge with QualityTracking.nok_by_shift() on [date, shift]
# → merge with ShiftReporting.shift_production() on [date, shift]
# → feed into ShiftHandoverReport.from_shift_data()
Example usage

tracker = DowntimeTracking(df)

Downtime per shift¤

shift_downtime = tracker.downtime_by_shift( state_uuid='machine_state', running_value='Running' )

Downtime by reason¤

reason_analysis = tracker.downtime_by_reason( state_uuid='machine_state', reason_uuid='downtime_reason', stopped_value='Stopped' )

Initialize downtime tracker.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data

required
time_column str

Name of timestamp column (default: 'systime')

'systime'
shift_definitions Optional[Dict[str, tuple[str, str]]]

Dictionary mapping shift names to (start, end) times Default: 3-shift operation (06:00-14:00, 14:00-22:00, 22:00-06:00)

None

downtime_by_shift ¤

downtime_by_shift(
    state_uuid: str,
    *,
    running_value: str = "Running",
    value_column: str = "value_string"
) -> pd.DataFrame

Calculate downtime duration per shift.

Parameters:

Name Type Description Default
state_uuid str

UUID for machine state signal

required
running_value str

Value that indicates machine is running

'Running'
value_column str

Column containing state values

'value_string'

Returns:

Type Description
DataFrame

DataFrame with downtime by shift:

DataFrame
  • date: Production date
DataFrame
  • shift: Shift name
DataFrame
  • total_minutes: Total shift duration in minutes
DataFrame
  • downtime_minutes: Downtime duration in minutes
DataFrame
  • uptime_minutes: Running time in minutes
DataFrame
  • availability_pct: Uptime percentage
Example

downtime_by_shift('machine_state', running_value='Running') date shift total_minutes downtime_minutes uptime_minutes availability_pct 0 2024-01-01 shift_1 480 45.2 434.8 90.6 1 2024-01-01 shift_2 480 67.5 412.5 85.9 2 2024-01-01 shift_3 480 92.0 388.0 80.8

downtime_by_reason ¤

downtime_by_reason(
    state_uuid: str,
    reason_uuid: str,
    *,
    stopped_value: str = "Stopped",
    value_column_state: str = "value_string",
    value_column_reason: str = "value_string"
) -> pd.DataFrame

Analyze downtime by reason code.

Parameters:

Name Type Description Default
state_uuid str

UUID for machine state signal

required
reason_uuid str

UUID for downtime reason signal

required
stopped_value str

Value indicating machine is stopped

'Stopped'
value_column_state str

Column containing state values

'value_string'
value_column_reason str

Column containing reason codes

'value_string'

Returns:

Type Description
DataFrame

DataFrame with downtime by reason:

DataFrame
  • reason: Reason code
DataFrame
  • occurrences: Number of downtime events
DataFrame
  • total_minutes: Total downtime for this reason
DataFrame
  • avg_minutes: Average downtime per occurrence
DataFrame
  • pct_of_total: Percentage of total downtime
Example

downtime_by_reason('state', 'reason', stopped_value='Stopped') reason occurrences total_minutes avg_minutes pct_of_total 0 Material_Shortage 12 145.5 12.1 35.2 1 Tool_Change 8 98.2 12.3 23.8 2 Quality_Issue 5 76.0 15.2 18.4

top_downtime_reasons ¤

top_downtime_reasons(
    state_uuid: str,
    reason_uuid: str,
    *,
    top_n: int = 5,
    stopped_value: str = "Stopped",
    value_column_state: str = "value_string",
    value_column_reason: str = "value_string"
) -> pd.DataFrame

Get top N downtime reasons (Pareto analysis).

Parameters:

Name Type Description Default
state_uuid str

UUID for machine state signal

required
reason_uuid str

UUID for downtime reason signal

required
top_n int

Number of top reasons to return

5
stopped_value str

Value indicating machine is stopped

'Stopped'
value_column_state str

Column containing state values

'value_string'
value_column_reason str

Column containing reason codes

'value_string'

Returns:

Type Description
DataFrame

DataFrame with top N reasons and cumulative percentage

Example

top_downtime_reasons('state', 'reason', top_n=5) reason total_minutes pct_of_total cumulative_pct 0 Material_Shortage 145.5 35.2 35.2 1 Tool_Change 98.2 23.8 59.0 2 Quality_Issue 76.0 18.4 77.4

availability_trend ¤

availability_trend(
    state_uuid: str,
    *,
    running_value: str = "Running",
    value_column: str = "value_string",
    window: str = "1D"
) -> pd.DataFrame

Calculate availability trend over time.

Parameters:

Name Type Description Default
state_uuid str

UUID for machine state signal

required
running_value str

Value that indicates machine is running

'Running'
value_column str

Column containing state values

'value_string'
window str

Time window for aggregation (e.g., '1D', '1W')

'1D'

Returns:

Type Description
DataFrame

DataFrame with availability trend:

DataFrame
  • period: Time period
DataFrame
  • availability_pct: Availability percentage
DataFrame
  • uptime_minutes: Total uptime
DataFrame
  • downtime_minutes: Total downtime
Example

availability_trend('state', window='1D') period availability_pct uptime_minutes downtime_minutes 0 2024-01-01 87.5 1260.0 180.0 1 2024-01-02 91.2 1313.3 126.7 2 2024-01-03 85.8 1235.5 204.5

QualityTracking ¤

QualityTracking(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Track NOK (defective) parts and quality metrics.

Each UUID represents one signal: - ok_counter_uuid: counter for good parts - nok_counter_uuid: counter for defective parts - part_id_uuid: part number signal (optional) - defect_reason_uuid: defect reason code (optional)

Merge keys: [date, shift] for shift-level, [date] for daily, [part_number] for part-level.

Pipeline outputs include quality_pct (alias for first_pass_yield_pct) so downstream modules (ShiftHandoverReport, PeriodSummary, OEECalculator) can join on a consistent column name.

Example usage

tracker = QualityTracking(df)

NOK parts per shift¤

shift_nok = tracker.nok_by_shift( ok_counter_uuid='good_parts', nok_counter_uuid='bad_parts' )

Quality by part number¤

part_quality = tracker.quality_by_part( ok_counter_uuid='good_parts', nok_counter_uuid='bad_parts', part_id_uuid='part_number' )

Pipeline: feed into PeriodSummary¤

daily = tracker.daily_quality_summary('good', 'bad')

daily has [date, ok_parts, nok_parts, total_parts, quality_pct, ...]¤

Initialize quality tracker.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data

required
time_column str

Name of timestamp column (default: 'systime')

'systime'
shift_definitions Optional[Dict[str, tuple[str, str]]]

Dictionary mapping shift names to (start, end) times Default: 3-shift operation (06:00-14:00, 14:00-22:00, 22:00-06:00)

None

nok_by_shift ¤

nok_by_shift(
    ok_counter_uuid: str,
    nok_counter_uuid: str,
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Calculate NOK (defective) parts per shift.

Parameters:

Name Type Description Default
ok_counter_uuid str

UUID for good parts counter

required
nok_counter_uuid str

UUID for defective parts counter

required
value_column str

Column containing counter values

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with quality metrics by shift:

DataFrame
  • date: Production date
DataFrame
  • shift: Shift name
DataFrame
  • ok_parts: Good parts produced
DataFrame
  • nok_parts: Defective parts
DataFrame
  • total_parts: Total parts produced
DataFrame
  • nok_rate_pct: Percentage of defective parts
DataFrame
  • first_pass_yield_pct: Percentage of good parts
Example

nok_by_shift('good_counter', 'bad_counter') date shift ok_parts nok_parts total_parts nok_rate_pct first_pass_yield_pct 0 2024-01-01 shift_1 450 12 462 2.6 97.4 1 2024-01-01 shift_2 425 18 443 4.1 95.9 2 2024-01-01 shift_3 380 25 405 6.2 93.8

quality_by_part ¤

quality_by_part(
    ok_counter_uuid: str,
    nok_counter_uuid: str,
    part_id_uuid: str,
    *,
    value_column_counter: str = "value_integer",
    value_column_part: str = "value_string"
) -> pd.DataFrame

Calculate quality metrics by part number.

Parameters:

Name Type Description Default
ok_counter_uuid str

UUID for good parts counter

required
nok_counter_uuid str

UUID for defective parts counter

required
part_id_uuid str

UUID for part number signal

required
value_column_counter str

Column containing counter values

'value_integer'
value_column_part str

Column containing part numbers

'value_string'

Returns:

Type Description
DataFrame

DataFrame with quality by part:

DataFrame
  • part_number: Part number/ID
DataFrame
  • ok_parts: Good parts produced
DataFrame
  • nok_parts: Defective parts
DataFrame
  • total_parts: Total parts produced
DataFrame
  • nok_rate_pct: Percentage of defective parts
DataFrame
  • first_pass_yield_pct: Percentage of good parts
Example

quality_by_part('good', 'bad', 'part_id') part_number ok_parts nok_parts total_parts nok_rate_pct first_pass_yield_pct 0 PART_A 1255 55 1310 4.2 95.8 1 PART_B 890 38 928 4.1 95.9

nok_by_reason ¤

nok_by_reason(
    nok_counter_uuid: str,
    defect_reason_uuid: str,
    *,
    value_column_counter: str = "value_integer",
    value_column_reason: str = "value_string"
) -> pd.DataFrame

Analyze NOK parts by defect reason.

Parameters:

Name Type Description Default
nok_counter_uuid str

UUID for defective parts counter

required
defect_reason_uuid str

UUID for defect reason signal

required
value_column_counter str

Column containing counter values

'value_integer'
value_column_reason str

Column containing reason codes

'value_string'

Returns:

Type Description
DataFrame

DataFrame with NOK by reason:

DataFrame
  • reason: Defect reason code
DataFrame
  • nok_parts: Number of defective parts
DataFrame
  • pct_of_total: Percentage of total NOK
Example

nok_by_reason('bad_parts', 'defect_reason') reason nok_parts pct_of_total 0 Dimension_Error 45 40.5 1 Surface_Defect 28 25.2 2 Wrong_Color 22 19.8

daily_quality_summary ¤

daily_quality_summary(
    ok_counter_uuid: str,
    nok_counter_uuid: str,
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Daily quality summary.

Parameters:

Name Type Description Default
ok_counter_uuid str

UUID for good parts counter

required
nok_counter_uuid str

UUID for defective parts counter

required
value_column str

Column containing counter values

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with daily quality:

DataFrame
  • date: Production date
DataFrame
  • ok_parts: Good parts produced
DataFrame
  • nok_parts: Defective parts
DataFrame
  • total_parts: Total parts produced
DataFrame
  • nok_rate_pct: Percentage of defective parts
DataFrame
  • first_pass_yield_pct: Percentage of good parts
Example

daily_quality_summary('good', 'bad') date ok_parts nok_parts total_parts nok_rate_pct first_pass_yield_pct 0 2024-01-01 1255 55 1310 4.2 95.8 1 2024-01-02 1308 42 1350 3.1 96.9 2 2024-01-03 1290 60 1350 4.4 95.6

OEECalculator ¤

OEECalculator(
    dataframe: DataFrame, *, time_column: str = "systime"
)

Bases: Base

Calculate Overall Equipment Effectiveness from timeseries signals.

Combines availability (from run/idle state), performance (from part counters and ideal cycle time), and quality (from total/reject counters) into a single OEE metric per day.

Example usage

oee = OEECalculator(df)

Individual components¤

avail = oee.calculate_availability('machine_state') perf = oee.calculate_performance('part_counter', ideal_cycle_time=30.0, run_state_uuid='machine_state') qual = oee.calculate_quality('total_counter', 'reject_counter')

Combined daily OEE¤

daily = oee.calculate_oee( run_state_uuid='machine_state', counter_uuid='part_counter', ideal_cycle_time=30.0, total_uuid='total_counter', reject_uuid='reject_counter', )

Initialize OEE calculator.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data.

required
time_column str

Name of timestamp column (default: 'systime').

'systime'

calculate_availability ¤

calculate_availability(
    run_state_uuid: str,
    planned_time_hours: Optional[float] = None,
    *,
    value_column: str = "value_bool"
) -> pd.DataFrame

Calculate availability percentage from run/idle intervals.

Availability = run_time / planned_time. When planned_time_hours is None the planned time is derived from the first-to-last timestamp span for each day.

Parameters:

Name Type Description Default
run_state_uuid str

UUID of the boolean run-state signal (True = running).

required
planned_time_hours Optional[float]

Fixed planned production hours per day. If None, the time span covered by data each day is used.

None
value_column str

Column holding the boolean state.

'value_bool'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date
DataFrame
  • run_seconds
DataFrame
  • planned_seconds
DataFrame
  • availability_pct

calculate_performance ¤

calculate_performance(
    counter_uuid: str,
    ideal_cycle_time: float,
    run_state_uuid: Optional[str] = None,
    *,
    value_column: str = "value_integer",
    run_value_column: str = "value_bool"
) -> pd.DataFrame

Calculate performance percentage (actual vs ideal throughput).

Performance = (actual_parts * ideal_cycle_time) / run_time. If run_state_uuid is None, the total time span per day is used as run time.

Parameters:

Name Type Description Default
counter_uuid str

UUID of the monotonic part counter.

required
ideal_cycle_time float

Ideal cycle time in seconds per part.

required
run_state_uuid Optional[str]

Optional UUID for run-state to compute actual run time.

None
value_column str

Column holding counter values.

'value_integer'
run_value_column str

Column holding boolean run state.

'value_bool'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date
DataFrame
  • actual_parts
DataFrame
  • ideal_parts
DataFrame
  • run_seconds
DataFrame
  • performance_pct

calculate_quality ¤

calculate_quality(
    total_uuid: str,
    reject_uuid: str,
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Calculate quality percentage (good parts / total parts).

Parameters:

Name Type Description Default
total_uuid str

UUID of the total-parts counter.

required
reject_uuid str

UUID of the reject-parts counter.

required
value_column str

Column holding counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date
DataFrame
  • total_parts
DataFrame
  • reject_parts
DataFrame
  • good_parts
DataFrame
  • quality_pct

calculate_oee ¤

calculate_oee(
    run_state_uuid: str,
    counter_uuid: str,
    ideal_cycle_time: float,
    total_uuid: Optional[str] = None,
    reject_uuid: Optional[str] = None,
    *,
    planned_time_hours: Optional[float] = None
) -> pd.DataFrame

Calculate daily OEE = Availability * Performance * Quality.

When total_uuid / reject_uuid are not provided, quality is assumed to be 100 %.

Parameters:

Name Type Description Default
run_state_uuid str

UUID of the boolean run-state signal.

required
counter_uuid str

UUID of the monotonic part counter.

required
ideal_cycle_time float

Ideal cycle time in seconds per part.

required
total_uuid Optional[str]

Optional UUID of total-parts counter (for quality).

None
reject_uuid Optional[str]

Optional UUID of reject-parts counter (for quality).

None
planned_time_hours Optional[float]

Fixed planned production hours per day.

None

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date
DataFrame
  • availability_pct
DataFrame
  • performance_pct
DataFrame
  • quality_pct
DataFrame
  • oee_pct

AlarmManagementEvents ¤

AlarmManagementEvents(
    dataframe: DataFrame,
    alarm_uuid: str,
    *,
    event_uuid: str = "prod:alarm",
    value_column: str = "value_bool",
    time_column: str = "systime"
)

Bases: Base

Analyze alarm signals following ISA-18.2 alarm management principles.

Works with boolean alarm signals where True = alarm active, False = alarm cleared. Provides metrics for alarm rationalisation and nuisance alarm detection.

Example usage

alarms = AlarmManagementEvents(df, alarm_uuid='temp_high_alarm')

freq = alarms.alarm_frequency(window='1h') stats = alarms.alarm_duration_stats() chatter = alarms.chattering_detection(min_transitions=5, window='10m') standing = alarms.standing_alarms(min_duration='1h')

Initialize alarm management analyser.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data.

required
alarm_uuid str

UUID of the alarm signal.

required
event_uuid str

UUID to tag derived events with.

'prod:alarm'
value_column str

Column holding the boolean alarm state.

'value_bool'
time_column str

Name of timestamp column.

'systime'

alarm_frequency ¤

alarm_frequency(window: str = '1h') -> pd.DataFrame

Count alarm activations per time window.

An activation is a transition from False to True.

Parameters:

Name Type Description Default
window str

Pandas offset alias for the aggregation window.

'1h'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • window_start
DataFrame
  • alarm_count
DataFrame
  • uuid
DataFrame
  • source_uuid

alarm_duration_stats ¤

alarm_duration_stats() -> pd.DataFrame

Compute min / avg / max / total duration of alarm-ON states.

Returns:

Type Description
DataFrame

DataFrame with a single row containing:

DataFrame
  • source_uuid
DataFrame
  • alarm_count
DataFrame
  • min_duration_seconds
DataFrame
  • avg_duration_seconds
DataFrame
  • max_duration_seconds
DataFrame
  • total_duration_seconds

chattering_detection ¤

chattering_detection(
    min_transitions: int = 5, window: str = "10m"
) -> pd.DataFrame

Detect chattering alarms (too many transitions in a short window).

A chattering alarm is one that toggles on/off rapidly, which is a nuisance and masks real alarms.

Parameters:

Name Type Description Default
min_transitions int

Minimum state changes within window to flag.

5
window str

Rolling window size (Pandas offset alias).

'10m'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • window_start
DataFrame
  • window_end
DataFrame
  • transition_count
DataFrame
  • uuid
DataFrame
  • source_uuid

standing_alarms ¤

standing_alarms(min_duration: str = '1h') -> pd.DataFrame

Identify alarms that stay active longer than min_duration.

Standing (stale) alarms reduce operator trust and should be investigated for shelving or re-engineering.

Parameters:

Name Type Description Default
min_duration str

Minimum ON duration to flag (Pandas offset alias).

'1h'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • start
DataFrame
  • end
DataFrame
  • duration_seconds
DataFrame
  • uuid
DataFrame
  • source_uuid

BatchTrackingEvents ¤

BatchTrackingEvents(
    dataframe: DataFrame,
    batch_uuid: str,
    *,
    event_uuid: str = "prod:batch",
    value_column: str = "value_string",
    time_column: str = "systime"
)

Bases: Base

Track batch/recipe production from a batch-ID string signal.

A "batch" is defined as a contiguous period where the batch-ID value remains constant. A new batch begins when the value changes.

Example usage

batches = BatchTrackingEvents(df, batch_uuid='batch_id_signal')

detected = batches.detect_batches() stats = batches.batch_duration_stats() yields = batches.batch_yield('part_counter') transitions = batches.batch_transition_matrix()

Initialize batch tracker.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data.

required
batch_uuid str

UUID of the batch-ID signal.

required
event_uuid str

UUID to tag derived events with.

'prod:batch'
value_column str

Column holding the batch ID string.

'value_string'
time_column str

Name of timestamp column.

'systime'

detect_batches ¤

detect_batches() -> pd.DataFrame

Detect batch start/end from value changes in the batch-ID signal.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • batch_id: The batch identifier string.
DataFrame
  • start: Timestamp of the first sample in the batch.
DataFrame
  • end: Timestamp of the last sample in the batch.
DataFrame
  • duration_seconds: Duration of the batch.
DataFrame
  • sample_count: Number of data points in the batch.
DataFrame
  • uuid: Event UUID.
DataFrame
  • source_uuid: Source signal UUID.

batch_duration_stats ¤

batch_duration_stats() -> pd.DataFrame

Compute duration statistics grouped by batch type (batch_id).

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • batch_id
DataFrame
  • count: Number of occurrences of this batch type.
DataFrame
  • min_duration_seconds
DataFrame
  • avg_duration_seconds
DataFrame
  • max_duration_seconds
DataFrame
  • total_duration_seconds

batch_yield ¤

batch_yield(
    counter_uuid: str,
    *,
    value_column_counter: str = "value_integer"
) -> pd.DataFrame

Compute production quantity for each detected batch.

Uses a monotonic counter signal. The yield per batch is the counter increase during the batch interval.

Parameters:

Name Type Description Default
counter_uuid str

UUID of the monotonic part counter.

required
value_column_counter str

Column holding counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • batch_id
DataFrame
  • start
DataFrame
  • end
DataFrame
  • duration_seconds
DataFrame
  • quantity: Parts produced during the batch.
DataFrame
  • uuid
DataFrame
  • source_uuid

batch_transition_matrix ¤

batch_transition_matrix() -> pd.DataFrame

Build a transition frequency matrix: which batch follows which.

Returns:

Type Description
DataFrame

DataFrame (pivot table) where index = from_batch, columns = to_batch,

DataFrame

values = transition count. An extra total column is appended.

BottleneckDetectionEvents ¤

BottleneckDetectionEvents(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    event_uuid: str = "prod:bottleneck",
    value_column: str = "value_bool"
)

Bases: Base

Production: Bottleneck Detection

Identify which station constrains a production line by analyzing utilization of multiple boolean run-state signals.

Methods: - station_utilization: Per-station uptime percentage per window. - detect_bottleneck: Identify the bottleneck station per window. - shifting_bottleneck: Track when the bottleneck moves between stations. - throughput_constraint_summary: Summary statistics across all stations.

station_utilization ¤

station_utilization(
    station_uuids: List[str], window: str = "1h"
) -> pd.DataFrame

Per-station uptime percentage per time window.

Parameters:

Name Type Description Default
station_uuids List[str]

List of UUID strings identifying station run-state signals.

required
window str

Resample window (e.g. '1h', '30m').

'1h'

Returns:

Type Description
DataFrame

DataFrame with columns: window_start, uuid, utilization_pct.

detect_bottleneck ¤

detect_bottleneck(
    station_uuids: List[str], window: str = "1h"
) -> pd.DataFrame

Identify the bottleneck station per window.

The bottleneck is the station with the highest utilization — it is always running because it is the constraint.

Parameters:

Name Type Description Default
station_uuids List[str]

List of station run-state UUIDs.

required
window str

Resample window.

'1h'

Returns:

Type Description
DataFrame

DataFrame with columns: window_start, window_end, bottleneck_uuid, utilization_pct.

shifting_bottleneck ¤

shifting_bottleneck(
    station_uuids: List[str], window: str = "1h"
) -> pd.DataFrame

Track when the bottleneck identity changes between stations.

Parameters:

Name Type Description Default
station_uuids List[str]

List of station run-state UUIDs.

required
window str

Resample window.

'1h'

Returns:

Type Description
DataFrame

DataFrame with columns: systime, from_uuid, to_uuid,

DataFrame

previous_utilization, new_utilization.

throughput_constraint_summary ¤

throughput_constraint_summary(
    station_uuids: List[str], window: str = "1h"
) -> Dict[str, Any]

Summary statistics for bottleneck analysis.

Parameters:

Name Type Description Default
station_uuids List[str]

List of station run-state UUIDs.

required
window str

Resample window.

'1h'

Returns:

Type Description
Dict[str, Any]

Dict with: bottleneck_counts, bottleneck_percentages,

Dict[str, Any]

most_frequent_bottleneck, avg_utilization_per_station.

MicroStopEvents ¤

MicroStopEvents(
    dataframe: DataFrame,
    run_state_uuid: str,
    *,
    event_uuid: str = "prod:micro_stop",
    value_column: str = "value_bool",
    time_column: str = "systime"
)

Bases: Base

Production: Micro-Stop Detection

Detect brief idle intervals that individually seem harmless but accumulate into significant availability losses.

Methods: - detect_micro_stops: Find idle intervals shorter than max_duration. - micro_stop_frequency: Count micro-stops per time window. - micro_stop_impact: Time lost to micro-stops per window. - micro_stop_patterns: Group micro-stops by hour-of-day.

detect_micro_stops ¤

detect_micro_stops(
    max_duration: str = "30s", min_duration: str = "0s"
) -> pd.DataFrame

Find idle intervals shorter than max_duration.

Parameters:

Name Type Description Default
max_duration str

Maximum duration to qualify as a micro-stop.

'30s'
min_duration str

Minimum duration to include (filter very short glitches).

'0s'

Returns:

Type Description
DataFrame

DataFrame with columns: start_time, end_time, duration,

DataFrame

preceding_run_duration.

micro_stop_frequency ¤

micro_stop_frequency(
    window: str = "1h", max_duration: str = "30s"
) -> pd.DataFrame

Count micro-stops per time window.

Parameters:

Name Type Description Default
window str

Resample window.

'1h'
max_duration str

Maximum idle duration to qualify as micro-stop.

'30s'

Returns:

Type Description
DataFrame

DataFrame with columns: window_start, count, total_lost_time,

DataFrame

pct_of_window.

micro_stop_impact ¤

micro_stop_impact(
    window: str = "1h", max_duration: str = "30s"
) -> pd.DataFrame

Time lost to micro-stops vs total available time per window.

Parameters:

Name Type Description Default
window str

Resample window.

'1h'
max_duration str

Maximum idle duration to qualify as micro-stop.

'30s'

Returns:

Type Description
DataFrame

DataFrame with columns: window_start, total_run_time,

DataFrame

total_micro_stop_time, availability_loss_pct.

micro_stop_patterns ¤

micro_stop_patterns(
    hour_grouping: bool = True, max_duration: str = "30s"
) -> pd.DataFrame

Group micro-stops by hour-of-day to find clustering patterns.

Parameters:

Name Type Description Default
hour_grouping bool

If True, group by hour. If False, group by shift (8h blocks).

True
max_duration str

Maximum idle duration to qualify as micro-stop.

'30s'

Returns:

Type Description
DataFrame

DataFrame with columns: hour (or shift), avg_count, avg_lost_time.

DutyCycleEvents ¤

DutyCycleEvents(
    dataframe: DataFrame,
    signal_uuid: str,
    *,
    event_uuid: str = "prod:duty_cycle",
    value_column: str = "value_bool",
    time_column: str = "systime"
)

Bases: Base

Production: Duty Cycle Analysis

Analyze on/off patterns from a boolean signal: duty cycle percentage, interval listing, transition counts, and excessive cycling detection.

Methods: - duty_cycle_per_window: On-time percentage per time window. - on_off_intervals: List every on and off interval with duration. - cycle_count: Number of on/off transitions per window. - excessive_cycling: Flag windows with too many transitions.

on_off_intervals ¤

on_off_intervals() -> pd.DataFrame

List every on and off interval with duration.

Returns:

Type Description
DataFrame

DataFrame with columns: start_time, end_time, state (on/off), duration.

duty_cycle_per_window ¤

duty_cycle_per_window(window: str = '1h') -> pd.DataFrame

Percentage of time the signal is True per window.

Parameters:

Name Type Description Default
window str

Resample window.

'1h'

Returns:

Type Description
DataFrame

DataFrame with columns: window_start, on_time, off_time, duty_cycle_pct.

cycle_count ¤

cycle_count(window: str = '1h') -> pd.DataFrame

Number of on/off transitions per window.

Parameters:

Name Type Description Default
window str

Resample window.

'1h'

Returns:

Type Description
DataFrame

DataFrame with columns: window_start, on_count, off_count, total_transitions.

excessive_cycling ¤

excessive_cycling(
    max_transitions: int = 20, window: str = "1h"
) -> pd.DataFrame

Flag windows where transition count exceeds threshold.

Excessive cycling indicates hunting, instability, or wear risk.

Parameters:

Name Type Description Default
max_transitions int

Threshold for flagging.

20
window str

Resample window.

'1h'

Returns:

Type Description
DataFrame

DataFrame with columns: window_start, transition_count,

DataFrame

avg_on_duration, avg_off_duration.

ValueTraceabilityEvents ¤

ValueTraceabilityEvents(
    dataframe: DataFrame,
    station_uuids: Dict[str, str],
    *,
    event_uuid: str = "prod:value_trace",
    value_column: str = "value_string",
    time_column: str = "systime"
)

Bases: Base

Trace a shared identifier across multiple stations.

Each station has its own UUID that carries a string value (the identifier currently being processed — serial number, order ID, batch code, etc.). This module detects when a given identifier appears at each station and builds a timeline.

Example usage

trace = ValueTraceabilityEvents( df, station_uuids={ 'station_a_uuid': 'Station A', 'station_b_uuid': 'Station B', 'station_c_uuid': 'Station C', }, )

Full timeline of every identifier across all stations¤

timeline = trace.build_timeline()

Lead time from first to last station¤

lead = trace.lead_time()

Where is each identifier right now?¤

status = trace.current_status()

Station dwell-time statistics¤

dwell = trace.station_dwell_statistics()

Initialize value traceability.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data.

required
station_uuids Dict[str, str]

Mapping of UUID -> human-readable station name. e.g. {'uuid_abc': 'Station A', 'uuid_def': 'Station B'}

required
event_uuid str

UUID to tag derived events with.

'prod:value_trace'
value_column str

Column holding the identifier string value.

'value_string'
time_column str

Name of timestamp column.

'systime'

build_timeline ¤

build_timeline() -> pd.DataFrame

Build a full timeline of every identifier at every station.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • identifier: The shared identifier string.
DataFrame
  • station_uuid: UUID of the station signal.
DataFrame
  • station_name: Human-readable station name.
DataFrame
  • start: Timestamp when the identifier appeared at the station.
DataFrame
  • end: Timestamp of the last sample at the station.
DataFrame
  • duration_seconds: Time spent at the station.
DataFrame
  • sample_count: Number of data points.
DataFrame
  • station_sequence: Order of visit (1-based) per identifier.
DataFrame
  • uuid: Event UUID.

lead_time ¤

lead_time() -> pd.DataFrame

Compute end-to-end lead time per identifier.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • identifier
DataFrame
  • first_station: Name of first station visited.
DataFrame
  • last_station: Name of last station visited.
DataFrame
  • first_seen: Earliest timestamp across all stations.
DataFrame
  • last_seen: Latest timestamp across all stations.
DataFrame
  • lead_time_seconds: Total time from first seen to last seen.
DataFrame
  • stations_visited: Number of distinct stations visited.
DataFrame
  • station_path: Arrow-separated ordered station names.
DataFrame
  • uuid: Event UUID.

current_status ¤

current_status() -> pd.DataFrame

Determine the last-known station for each identifier.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • identifier
DataFrame
  • current_station: Name of the last station where seen.
DataFrame
  • current_station_uuid: UUID of that station.
DataFrame
  • arrived_at: When the identifier arrived at the current station.
DataFrame
  • time_at_station_seconds: Dwell time at the current station so far.
DataFrame
  • uuid: Event UUID.

station_dwell_statistics ¤

station_dwell_statistics() -> pd.DataFrame

Compute dwell-time statistics per station (across all identifiers).

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • station_name
DataFrame
  • station_uuid
DataFrame
  • identifier_count: Number of distinct identifiers seen.
DataFrame
  • min_dwell_seconds
DataFrame
  • avg_dwell_seconds
DataFrame
  • max_dwell_seconds
DataFrame
  • total_dwell_seconds

RoutingTraceabilityEvents ¤

RoutingTraceabilityEvents(
    dataframe: DataFrame,
    id_uuid: str,
    routing_uuid: str,
    *,
    state_map: Optional[
        Dict[Union[int, float, str], str]
    ] = None,
    station_map: Optional[
        Dict[Union[int, float, str], str]
    ] = None,
    event_uuid: str = "prod:routing_trace",
    id_value_column: str = "value_string",
    routing_value_column: str = "value_integer",
    time_column: str = "systime"
)

Bases: Base

Trace item routing using an ID signal paired with a state/routing signal.

Two UUIDs work together: - id_uuid: string signal carrying the current identifier (serial number, order ID, batch code, etc.). - routing_uuid: signal whose value encodes the current process step or station.

A state_map translates signal values to human-readable station or step names. Without it, raw values are used as labels.

Example usage::

# PLC step numbers mapped to process steps
trace = RoutingTraceabilityEvents(
    df,
    id_uuid='serial_code_signal',
    routing_uuid='step_chain_signal',
    state_map={
        10: 'Heating',
        20: 'Holding',
        30: 'Cooling',
        40: 'Discharge',
    },
)

# Station handover signal mapped to stations
trace = RoutingTraceabilityEvents(
    df,
    id_uuid='serial_code_signal',
    routing_uuid='handover_signal',
    state_map={1: 'Welding', 2: 'Painting', 3: 'Assembly'},
)

timeline = trace.build_routing_timeline()
lead = trace.lead_time()
stats = trace.station_statistics()
paths = trace.routing_paths()

Initialize routing traceability.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data.

required
id_uuid str

UUID of the signal carrying item identifiers.

required
routing_uuid str

UUID of the state/routing signal.

required
state_map Optional[Dict[Union[int, float, str], str]]

Mapping from signal value to station/step name. e.g. {10: 'Heating', 20: 'Holding', 30: 'Cooling'} or {1: 'Welding', 2: 'Painting', 3: 'Assembly'} Values can be int, float, or str keys. If not provided, raw signal values are used as labels.

None
station_map Optional[Dict[Union[int, float, str], str]]

Deprecated alias for state_map (for backwards compatibility). Use state_map instead.

None
event_uuid str

UUID to tag derived events with.

'prod:routing_trace'
id_value_column str

Column holding the item ID string.

'value_string'
routing_value_column str

Column holding the state/routing value.

'value_integer'
time_column str

Name of timestamp column.

'systime'

build_routing_timeline ¤

build_routing_timeline() -> pd.DataFrame

Correlate the ID signal with the state/routing signal to build a timeline.

For each sample of the routing signal, the current item ID is determined via backward-fill merge (most recent ID at that timestamp). Contiguous intervals where the same (item_id, state) pair holds are grouped into single events.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • item_id: Identifier string.
DataFrame
  • routing_value: Raw signal value.
DataFrame
  • station_name: Resolved station/step name from state_map.
DataFrame
  • start: First timestamp of interval.
DataFrame
  • end: Last timestamp of interval.
DataFrame
  • duration_seconds: Time at this state.
DataFrame
  • sample_count: Number of routing samples in interval.
DataFrame
  • station_sequence: Visit order per item (1-based).
DataFrame
  • uuid: Event UUID.

lead_time ¤

lead_time() -> pd.DataFrame

Compute end-to-end lead time per item.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • item_id
DataFrame
  • first_station: Name of first station/step visited.
DataFrame
  • last_station: Name of last station/step visited.
DataFrame
  • first_seen: Earliest timestamp.
DataFrame
  • last_seen: Latest timestamp.
DataFrame
  • lead_time_seconds: Total elapsed time.
DataFrame
  • stations_visited: Number of distinct stations/steps.
DataFrame
  • routing_path: Ordered station names joined by " -> ".
DataFrame
  • uuid: Event UUID.

station_statistics ¤

station_statistics() -> pd.DataFrame

Compute dwell-time statistics per station/step across all items.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • station_name
DataFrame
  • routing_value
DataFrame
  • item_count: Number of distinct items seen.
DataFrame
  • min_dwell_seconds
DataFrame
  • avg_dwell_seconds
DataFrame
  • max_dwell_seconds
DataFrame
  • total_dwell_seconds

routing_paths ¤

routing_paths() -> pd.DataFrame

Analyze routing path frequencies -- which station sequences are most common.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • routing_path: Ordered station names joined by " -> ".
DataFrame
  • item_count: Number of items that followed this path.
DataFrame
  • avg_lead_time_seconds: Average lead time for items on this path.
DataFrame
  • min_lead_time_seconds
DataFrame
  • max_lead_time_seconds

MultiProcessTraceabilityEvents ¤

MultiProcessTraceabilityEvents(
    dataframe: DataFrame,
    processes: List[Dict[str, str]],
    *,
    handovers: Optional[List[Dict[str, str]]] = None,
    event_uuid: str = "prod:multi_process_trace",
    id_value_column: str = "value_string",
    handover_value_column: str = "value_integer",
    time_column: str = "systime"
)

Bases: Base

Trace items across a multi-station topology with parallel paths.

Example usage::

trace = MultiProcessTraceabilityEvents(
    df,
    processes=[
        {"id_uuid": "serial_weld1",  "station": "Welding Cell 1"},
        {"id_uuid": "serial_weld2",  "station": "Welding Cell 2"},
        {"id_uuid": "serial_paint",  "station": "Painting"},
        {"id_uuid": "serial_assy",   "station": "Assembly"},
    ],
    handovers=[
        {"uuid": "ho_w1_paint", "from_station": "Welding Cell 1", "to_station": "Painting"},
        {"uuid": "ho_w2_paint", "from_station": "Welding Cell 2", "to_station": "Painting"},
        {"uuid": "ho_paint_assy", "from_station": "Painting", "to_station": "Assembly"},
    ],
)

timeline = trace.build_timeline()
lead = trace.lead_time()
parallel = trace.parallel_activity()
handover_log = trace.handover_log()
paths = trace.routing_paths()
station_stats = trace.station_statistics()

Initialize multi-process traceability.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame with timeseries data.

required
processes List[Dict[str, str]]

List of process definitions, each a dict with: - id_uuid: UUID of the signal carrying item IDs at this station. - station: Human-readable station name.

required
handovers Optional[List[Dict[str, str]]]

Optional list of handover signal definitions, each a dict: - uuid: UUID of the handover signal. - from_station: Station name the item leaves. - to_station: Station name the item arrives at. Handover signal value > 0 (or True) indicates a transfer event.

None
event_uuid str

UUID to tag derived events with.

'prod:multi_process_trace'
id_value_column str

Column holding item ID strings in process signals.

'value_string'
handover_value_column str

Column holding handover trigger values.

'value_integer'
time_column str

Name of timestamp column.

'systime'

build_timeline ¤

build_timeline() -> pd.DataFrame

Build a full timeline of every item at every station.

Handles parallel stations: the same item may appear at overlapping time intervals at different stations (concurrent processing), or different items at parallel cells of the same station type.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • item_id: Order / serial number.
DataFrame
  • station: Human-readable station name.
DataFrame
  • id_uuid: Source UUID for this station's ID signal.
DataFrame
  • start: Interval start.
DataFrame
  • end: Interval end.
DataFrame
  • duration_seconds: Time at this station.
DataFrame
  • sample_count: Number of data points.
DataFrame
  • step_sequence: Visit order per item (1-based, by start time).
DataFrame
  • is_parallel: True if this interval overlaps with another station for the same item.
DataFrame
  • uuid: Event UUID.

lead_time ¤

lead_time() -> pd.DataFrame

Compute end-to-end lead time per item across all processes.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • item_id
DataFrame
  • first_station: First station visited.
DataFrame
  • last_station: Last station visited.
DataFrame
  • first_seen: Earliest timestamp.
DataFrame
  • last_seen: Latest timestamp.
DataFrame
  • lead_time_seconds: Total elapsed time.
DataFrame
  • stations_visited: Number of distinct stations.
DataFrame
  • station_path: Ordered station names (" -> ").
DataFrame
  • has_parallel: Whether item had parallel processing.
DataFrame
  • uuid: Event UUID.

parallel_activity ¤

parallel_activity() -> pd.DataFrame

Find items that were processed at multiple stations simultaneously.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • item_id
DataFrame
  • station_a: First overlapping station.
DataFrame
  • station_b: Second overlapping station.
DataFrame
  • overlap_start: Start of overlap period.
DataFrame
  • overlap_end: End of overlap period.
DataFrame
  • overlap_seconds: Duration of overlap.
DataFrame
  • uuid: Event UUID.

handover_log ¤

handover_log() -> pd.DataFrame

Extract handover events and correlate with item IDs.

For each handover signal, detects trigger points (value > 0 or value changes to a truthy state) and resolves which item was being transferred based on the from-station's ID signal at that time.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • timestamp: When the handover fired.
DataFrame
  • item_id: Item being transferred (from from_station's ID signal).
DataFrame
  • from_station: Source station.
DataFrame
  • to_station: Destination station.
DataFrame
  • handover_uuid: UUID of the handover signal.
DataFrame
  • handover_value: Raw value of the handover signal.
DataFrame
  • uuid: Event UUID.

station_statistics ¤

station_statistics() -> pd.DataFrame

Compute dwell-time statistics per station across all items.

Distinguishes parallel cells of the same station type.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • station: Station name.
DataFrame
  • id_uuid: Specific cell UUID (for parallel cells).
DataFrame
  • item_count: Distinct items processed.
DataFrame
  • min_dwell_seconds
DataFrame
  • avg_dwell_seconds
DataFrame
  • max_dwell_seconds
DataFrame
  • total_dwell_seconds

routing_paths ¤

routing_paths() -> pd.DataFrame

Analyze routing path frequencies across all items.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • station_path: Ordered station names (" -> ").
DataFrame
  • item_count: Items that followed this path.
DataFrame
  • avg_lead_time_seconds
DataFrame
  • min_lead_time_seconds
DataFrame
  • max_lead_time_seconds
DataFrame
  • has_parallel_steps: Whether path includes parallel processing.

SetupTimeTracking ¤

SetupTimeTracking(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Track and analyze setup/changeover durations.

Each UUID represents one signal: - state_uuid: machine state signal containing a setup indicator value - part_id_uuid: part number / product type signal (optional)

Merge keys: [date, shift] for shift-level, [period] for trend data, [from_product, to_product] for product transition analysis.

Pipeline example::

setup = SetupTimeTracking(df)
durations = setup.setup_durations('machine_state', setup_value='Setup')
# → merge with ShiftReporting.shift_production() on [date, shift]
stats = setup.setup_statistics('machine_state')
# → standalone KPI reporting
Example usage

tracker = SetupTimeTracking(df)

List all setup events¤

events = tracker.setup_durations(state_uuid='machine_state')

Setup time by product transition¤

by_product = tracker.setup_by_product( state_uuid='machine_state', part_id_uuid='part_number', )

setup_durations ¤

setup_durations(
    state_uuid: str,
    *,
    setup_value: str = "Setup",
    value_column: str = "value_string"
) -> pd.DataFrame

List every setup event with duration.

Parameters:

Name Type Description Default
state_uuid str

UUID for machine state signal.

required
setup_value str

Value that indicates a setup state.

'Setup'
value_column str

Column containing state values.

'value_string'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • start_time, end_time, duration_minutes, date, shift

setup_by_product ¤

setup_by_product(
    state_uuid: str,
    part_id_uuid: str,
    *,
    setup_value: str = "Setup",
    value_column_state: str = "value_string",
    value_column_part: str = "value_string"
) -> pd.DataFrame

Setup time statistics by product transition (from → to).

Parameters:

Name Type Description Default
state_uuid str

UUID for machine state signal.

required
part_id_uuid str

UUID for part number / product type signal.

required
setup_value str

Value indicating setup state.

'Setup'
value_column_state str

Column containing state values.

'value_string'
value_column_part str

Column containing part number values.

'value_string'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • from_product, to_product, avg_minutes, min_minutes, max_minutes, count

setup_statistics ¤

setup_statistics(
    state_uuid: str,
    *,
    setup_value: str = "Setup",
    value_column: str = "value_string"
) -> pd.DataFrame

Overall setup time statistics.

Parameters:

Name Type Description Default
state_uuid str

UUID for machine state signal.

required
setup_value str

Value indicating setup state.

'Setup'
value_column str

Column containing state values.

'value_string'

Returns:

Type Description
DataFrame

DataFrame (single row) with columns:

DataFrame
  • total_setups, total_minutes, avg_minutes, median_minutes, std_minutes, pct_of_available_time

setup_trend ¤

setup_trend(
    state_uuid: str,
    *,
    setup_value: str = "Setup",
    value_column: str = "value_string",
    window: str = "1W"
) -> pd.DataFrame

Track setup time trend over time.

Parameters:

Name Type Description Default
state_uuid str

UUID for machine state signal.

required
setup_value str

Value indicating setup state.

'Setup'
value_column str

Column containing state values.

'value_string'
window str

Time window for aggregation (default '1W').

'1W'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • period, avg_setup_minutes, setup_count, total_setup_minutes

OperatorPerformanceTracking ¤

OperatorPerformanceTracking(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Track and compare operator performance.

Each UUID represents one signal: - operator_uuid: operator or team identifier signal - counter_uuid: production counter signal - ok_uuid / nok_uuid: good/bad parts counters (optional, for quality)

Merge keys: [operator] for operator-level, [date, shift] for shift-level.

Pipeline example::

ops = OperatorPerformanceTracking(df)
by_op = ops.production_by_operator('operator_id', 'part_counter')
# → standalone KPI report
quality = ops.quality_by_operator('operator_id', 'good_parts', 'bad_parts')
# → merge with QualityTracking outputs on [date, shift]
Example usage

tracker = OperatorPerformanceTracking(df)

Parts per operator¤

prod = tracker.production_by_operator( operator_uuid='operator_id', counter_uuid='part_counter', )

Efficiency vs target¤

eff = tracker.operator_efficiency( operator_uuid='operator_id', counter_uuid='part_counter', target_per_shift=500, )

production_by_operator ¤

production_by_operator(
    operator_uuid: str,
    counter_uuid: str,
    *,
    value_column_operator: str = "value_string",
    value_column_counter: str = "value_integer"
) -> pd.DataFrame

Parts produced per operator.

Parameters:

Name Type Description Default
operator_uuid str

UUID for operator/team identifier signal.

required
counter_uuid str

UUID for production counter signal.

required
value_column_operator str

Column containing operator names.

'value_string'
value_column_counter str

Column containing counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • operator, total_produced, shifts_worked, avg_per_shift

operator_efficiency ¤

operator_efficiency(
    operator_uuid: str,
    counter_uuid: str,
    target_per_shift: int,
    *,
    value_column_operator: str = "value_string",
    value_column_counter: str = "value_integer"
) -> pd.DataFrame

Operator efficiency vs a per-shift target.

Parameters:

Name Type Description Default
operator_uuid str

UUID for operator/team identifier signal.

required
counter_uuid str

UUID for production counter signal.

required
target_per_shift int

Target production quantity per shift.

required
value_column_operator str

Column containing operator names.

'value_string'
value_column_counter str

Column containing counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • operator, total_produced, target, efficiency_pct

quality_by_operator ¤

quality_by_operator(
    operator_uuid: str,
    ok_uuid: str,
    nok_uuid: str,
    *,
    value_column_operator: str = "value_string",
    value_column_counter: str = "value_integer"
) -> pd.DataFrame

Quality metrics (First Pass Yield) per operator.

Parameters:

Name Type Description Default
operator_uuid str

UUID for operator/team identifier signal.

required
ok_uuid str

UUID for good parts counter.

required
nok_uuid str

UUID for bad parts counter.

required
value_column_operator str

Column containing operator names.

'value_string'
value_column_counter str

Column containing counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • operator, ok_count, nok_count, first_pass_yield_pct

operator_comparison ¤

operator_comparison(
    operator_uuid: str,
    counter_uuid: str,
    *,
    value_column_operator: str = "value_string",
    value_column_counter: str = "value_integer"
) -> pd.DataFrame

Ranked operator performance comparison.

Parameters:

Name Type Description Default
operator_uuid str

UUID for operator/team identifier signal.

required
counter_uuid str

UUID for production counter signal.

required
value_column_operator str

Column containing operator names.

'value_string'
value_column_counter str

Column containing counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • operator, total_produced, rank, pct_of_best

ReworkTracking ¤

ReworkTracking(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Track parts that require rework (re-processing through a station).

Each UUID represents one signal: - rework_uuid: rework event counter (monotonic integer) - reason_uuid: rework reason code signal (optional) - total_production_uuid: total production counter (optional) - part_id_uuid: part number / product type signal (optional)

Merge keys: [date, shift] for shift-level, [period] for trend, [reason] for reason-level, [part_number] for part-level.

Pipeline example::

rework = ReworkTracking(df)
shift_rework = rework.rework_by_shift('rework_counter')
# → merge with ShiftReporting.shift_production() on [date, shift]
# → merge with QualityTracking.nok_by_shift() on [date, shift]
rate = rework.rework_rate('rework_counter', 'total_counter')
# → merge with ShiftReporting.shift_production() on [date, shift]
Example usage

tracker = ReworkTracking(df)

Rework per shift¤

shift_rework = tracker.rework_by_shift(rework_uuid='rework_counter')

Rework rate¤

rate = tracker.rework_rate( rework_uuid='rework_counter', total_production_uuid='total_counter', )

rework_by_shift ¤

rework_by_shift(
    rework_uuid: str, *, value_column: str = "value_integer"
) -> pd.DataFrame

Rework count per shift.

Parameters:

Name Type Description Default
rework_uuid str

UUID for rework event counter signal.

required
value_column str

Column containing counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date, shift, rework_count

rework_by_reason ¤

rework_by_reason(
    rework_uuid: str,
    reason_uuid: str,
    *,
    value_column_rework: str = "value_integer",
    value_column_reason: str = "value_string"
) -> pd.DataFrame

Rework quantity by reason code.

Parameters:

Name Type Description Default
rework_uuid str

UUID for rework counter signal.

required
reason_uuid str

UUID for rework reason code signal.

required
value_column_rework str

Column containing rework counter values.

'value_integer'
value_column_reason str

Column containing reason codes.

'value_string'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • reason, rework_count, pct_of_total

rework_rate ¤

rework_rate(
    rework_uuid: str,
    total_production_uuid: str,
    *,
    value_column_rework: str = "value_integer",
    value_column_production: str = "value_integer"
) -> pd.DataFrame

Rework rate as percentage of total production per shift.

Parameters:

Name Type Description Default
rework_uuid str

UUID for rework counter signal.

required
total_production_uuid str

UUID for total production counter signal.

required
value_column_rework str

Column containing rework counter values.

'value_integer'
value_column_production str

Column containing production counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date, shift, total_produced, rework_count, rework_rate_pct

rework_cost ¤

rework_cost(
    rework_uuid: str,
    part_id_uuid: str,
    rework_costs: Dict[str, float],
    *,
    value_column_rework: str = "value_integer",
    value_column_part: str = "value_string"
) -> pd.DataFrame

Convert rework counts to monetary cost by part number.

Parameters:

Name Type Description Default
rework_uuid str

UUID for rework counter signal.

required
part_id_uuid str

UUID for part number / product type signal.

required
rework_costs Dict[str, float]

Dict mapping part numbers to cost per rework.

required
value_column_rework str

Column containing rework counter values.

'value_integer'
value_column_part str

Column containing part number values.

'value_string'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • part_number, rework_count, cost_per_rework, total_cost

rework_trend ¤

rework_trend(
    rework_uuid: str,
    *,
    value_column: str = "value_integer",
    window: str = "1D"
) -> pd.DataFrame

Track rework count trend over time.

Parameters:

Name Type Description Default
rework_uuid str

UUID for rework counter signal.

required
value_column str

Column containing counter values.

'value_integer'
window str

Time window for aggregation (default '1D').

'1D'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • period, rework_count

PerformanceLossTracking ¤

PerformanceLossTracking(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Track performance and speed losses against target cycle times.

Identifies hidden losses where the machine is running but slower than it should be. Typically 10-20% of production time is lost to speed losses.

Each UUID represents one signal: - cycle_uuid: cycle trigger or production counter - part_id_uuid: part number signal (optional, for per-part targets)

Merge keys: [date, shift] for shift-level, [period] for trend data.

Pipeline example::

perf = PerformanceLossTracking(df)
shift_perf = perf.performance_by_shift('cycle', target_cycle_time=45)
# → merge with DowntimeTracking.downtime_by_shift() on [date, shift]
# → merge with QualityTracking.nok_by_shift() on [date, shift]
# → feed into ShiftHandoverReport.from_shift_data()
Example usage

tracker = PerformanceLossTracking(df)

Performance by shift¤

perf = tracker.performance_by_shift( cycle_uuid='cycle_trigger', target_cycle_time=45.0, )

Identify slow periods¤

slow = tracker.slow_periods( cycle_uuid='cycle_trigger', target_cycle_time=45.0, threshold_pct=90, )

performance_by_shift ¤

performance_by_shift(
    cycle_uuid: str,
    target_cycle_time: float,
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Calculate performance percentage per shift.

Performance = (target_cycle_time * actual_parts) / elapsed_run_time.

Parameters:

Name Type Description Default
cycle_uuid str

UUID of cycle trigger or counter signal.

required
target_cycle_time float

Target (ideal) cycle time in seconds.

required
value_column str

Column with counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date, shift, actual_parts, avg_cycle_time_s, target_cycle_time_s, performance_pct, loss_minutes

slow_periods ¤

slow_periods(
    cycle_uuid: str,
    target_cycle_time: float,
    *,
    threshold_pct: float = 90.0,
    window: str = "1h",
    value_column: str = "value_integer"
) -> pd.DataFrame

Identify time windows where performance is below threshold.

Parameters:

Name Type Description Default
cycle_uuid str

UUID of cycle trigger or counter signal.

required
target_cycle_time float

Target cycle time in seconds.

required
threshold_pct float

Performance must be below this to flag (default 90%).

90.0
window str

Rolling window size (default '1h').

'1h'
value_column str

Column with counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • window_start, window_end, actual_parts, avg_cycle_time_s, performance_pct, loss_minutes

performance_trend ¤

performance_trend(
    cycle_uuid: str,
    target_cycle_time: float,
    *,
    window: str = "1D",
    value_column: str = "value_integer"
) -> pd.DataFrame

Track performance trend over time.

Parameters:

Name Type Description Default
cycle_uuid str

UUID of cycle trigger or counter signal.

required
target_cycle_time float

Target cycle time in seconds.

required
window str

Time window for aggregation (default '1D').

'1D'
value_column str

Column with counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • period, actual_parts, avg_cycle_time_s, performance_pct, loss_minutes

ScrapTracking ¤

ScrapTracking(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Track material scrap and waste.

Each UUID represents one signal: - scrap_uuid: scrap weight or count signal - reason_uuid: scrap reason code (optional) - part_id_uuid: part number / material type (optional)

Merge keys: [date, shift] for shift-level, [period] for trend, [reason] for reason-level, [part_number] for part-level.

Pipeline example::

scrap = ScrapTracking(df)
shift_scrap = scrap.scrap_by_shift('scrap_weight')
# → merge with ShiftReporting.shift_production() on [date, shift]
# → merge with QualityTracking.nok_by_shift() on [date, shift]
cost = scrap.scrap_cost('scrap_weight', 'part_id', {'A': 12.5})
# → merge with QualityTracking.quality_by_part() on [part_number]
Example usage

tracker = ScrapTracking(df)

Scrap per shift¤

shift_scrap = tracker.scrap_by_shift(scrap_uuid='scrap_weight')

Scrap by reason¤

reasons = tracker.scrap_by_reason( scrap_uuid='scrap_weight', reason_uuid='scrap_reason' )

Convert to cost¤

cost = tracker.scrap_cost( scrap_uuid='scrap_weight', part_id_uuid='part_number', material_costs={'PART_A': 12.50, 'PART_B': 8.75} )

scrap_by_shift ¤

scrap_by_shift(
    scrap_uuid: str, *, value_column: str = "value_double"
) -> pd.DataFrame

Scrap quantity per shift.

Parameters:

Name Type Description Default
scrap_uuid str

UUID for scrap weight/count signal.

required
value_column str

Column containing scrap values.

'value_double'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date, shift, scrap_quantity

scrap_by_reason ¤

scrap_by_reason(
    scrap_uuid: str,
    reason_uuid: str,
    *,
    value_column_scrap: str = "value_double",
    value_column_reason: str = "value_string"
) -> pd.DataFrame

Scrap quantity by reason code.

Parameters:

Name Type Description Default
scrap_uuid str

UUID for scrap weight/count signal.

required
reason_uuid str

UUID for scrap reason code signal.

required
value_column_scrap str

Column containing scrap values.

'value_double'
value_column_reason str

Column containing reason codes.

'value_string'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • reason, scrap_quantity, pct_of_total

scrap_cost ¤

scrap_cost(
    scrap_uuid: str,
    part_id_uuid: str,
    material_costs: Dict[str, float],
    *,
    value_column_scrap: str = "value_double",
    value_column_part: str = "value_string"
) -> pd.DataFrame

Convert scrap quantities to monetary cost.

Parameters:

Name Type Description Default
scrap_uuid str

UUID for scrap weight/count signal.

required
part_id_uuid str

UUID for part number / material type signal.

required
material_costs Dict[str, float]

Dict mapping part numbers to cost per unit scrap.

required
value_column_scrap str

Column containing scrap values.

'value_double'
value_column_part str

Column containing part numbers.

'value_string'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • part_number, scrap_quantity, cost_per_unit, total_cost

scrap_trend ¤

scrap_trend(
    scrap_uuid: str,
    *,
    value_column: str = "value_double",
    window: str = "1D"
) -> pd.DataFrame

Track scrap quantity trend over time.

Parameters:

Name Type Description Default
scrap_uuid str

UUID for scrap weight/count signal.

required
value_column str

Column containing scrap values.

'value_double'
window str

Time window for aggregation (default '1D').

'1D'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • period, scrap_quantity

TargetTracking ¤

TargetTracking(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Compare any production metric to targets.

Every plant has daily/shift targets. This module provides a generic way to compare actual performance against those targets.

Merge keys: [date, shift] for shift-level, [date] for daily.

Pipeline example::

target = TargetTracking(df)
result = target.compare_to_target('counter', {'shift_1': 450})
# → merge with PerformanceLossTracking.performance_by_shift() on [date, shift]
# → merge with QualityTracking.nok_by_shift() on [date, shift]
# → result['status'] column enables filtering/alerting
Example usage

tracker = TargetTracking(df)

Compare counter to fixed target¤

result = tracker.compare_to_target( metric_uuid='production_counter', targets={'shift_1': 450, 'shift_2': 450, 'shift_3': 400}, )

Achievement summary over time¤

summary = tracker.target_achievement_summary( metric_uuid='production_counter', daily_target=1300, )

compare_to_target ¤

compare_to_target(
    metric_uuid: str,
    targets: Dict[str, float],
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Compare actual metric to per-shift targets.

Parameters:

Name Type Description Default
metric_uuid str

UUID of the metric signal (counter).

required
targets Dict[str, float]

Dict mapping shift names to target values.

required
value_column str

Column containing metric values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date, shift, actual, target, variance, achievement_pct, status

target_achievement_summary ¤

target_achievement_summary(
    metric_uuid: str,
    daily_target: float,
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Summarize target achievement over time.

Parameters:

Name Type Description Default
metric_uuid str

UUID of the metric signal (counter).

required
daily_target float

Daily production target.

required
value_column str

Column containing metric values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • date, actual, target, variance, achievement_pct, status

target_hit_rate ¤

target_hit_rate(
    metric_uuid: str,
    daily_target: float,
    *,
    value_column: str = "value_integer"
) -> Dict[str, Union[float, int]]

How often are targets met?

Parameters:

Name Type Description Default
metric_uuid str

UUID of the metric signal (counter).

required
daily_target float

Daily production target.

required
value_column str

Column containing metric values.

'value_integer'

Returns:

Type Description
Dict[str, Union[float, int]]

Dict with:

Dict[str, Union[float, int]]
  • total_days: Number of days analyzed.
Dict[str, Union[float, int]]
  • days_on_target: Days where target was met.
Dict[str, Union[float, int]]
  • hit_rate_pct: Percentage of days meeting target.
Dict[str, Union[float, int]]
  • avg_achievement_pct: Average achievement across days.

ShiftHandoverReport ¤

ShiftHandoverReport(
    dataframe: DataFrame,
    *,
    time_column: str = "systime",
    shift_definitions: Optional[
        Dict[str, tuple[str, str]]
    ] = None
)

Bases: Base

Generate automated shift handover reports.

Combines production, quality, and downtime data into a single summary suitable for shift handover meetings.

Merge keys: [date, shift] — the report output is keyed on these columns.

Two usage patterns:

Pattern A — from raw signals (one-step):

report = ShiftHandoverReport(df)
result = report.generate_report(
    counter_uuid='prod', ok_counter_uuid='ok',
    nok_counter_uuid='nok', state_uuid='state',
    targets={'shift_1': 450, 'shift_2': 450},
)

Pattern B — pipeline (compose from upstream modules):

# Step 1: compute upstream DataFrames
prod   = ShiftReporting(df).shift_production('counter')
qual   = QualityTracking(df).nok_by_shift('ok', 'nok')
downtime = DowntimeTracking(df).downtime_by_shift('state')

# Step 2: assemble into handover report
result = ShiftHandoverReport.from_shift_data(
    production_df=prod,
    quality_df=qual,
    downtime_df=downtime,
    targets={'shift_1': 450, 'shift_2': 450},
)

Both patterns return the same output schema.

from_shift_data staticmethod ¤

from_shift_data(
    production_df: DataFrame,
    quality_df: Optional[DataFrame] = None,
    downtime_df: Optional[DataFrame] = None,
    *,
    targets: Optional[Dict[str, float]] = None,
    report_date: Optional[str] = None
) -> pd.DataFrame

Build a handover report from pre-computed shift-level DataFrames.

This is the pipeline-friendly entry-point. Instead of reading raw signals, it accepts DataFrames that were already computed by upstream modules (ShiftReporting, QualityTracking, DowntimeTracking, etc.).

Parameters:

Name Type Description Default
production_df DataFrame

DataFrame with [date, shift, quantity] — e.g. from ShiftReporting.shift_production().

required
quality_df Optional[DataFrame]

DataFrame with [date, shift, ok_parts, nok_parts, quality_pct] — e.g. from QualityTracking.nok_by_shift(). Optional.

None
downtime_df Optional[DataFrame]

DataFrame with [date, shift, availability_pct, downtime_minutes] — e.g. from DowntimeTracking.downtime_by_shift(). Optional.

None
targets Optional[Dict[str, float]]

Per-shift production targets (dict).

None
report_date Optional[str]

Specific date (YYYY-MM-DD). If None, uses latest.

None

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame

date, shift, production, production_target, production_achievement_pct,

DataFrame

ok_parts, nok_parts, quality_pct, availability_pct, downtime_minutes

generate_report ¤

generate_report(
    counter_uuid: str,
    ok_counter_uuid: str,
    nok_counter_uuid: str,
    state_uuid: str,
    *,
    targets: Optional[Dict[str, float]] = None,
    quality_target_pct: float = 98.0,
    availability_target_pct: float = 90.0,
    running_value: str = "Running",
    value_column_counter: str = "value_integer",
    value_column_state: str = "value_string",
    report_date: Optional[str] = None
) -> pd.DataFrame

Generate a shift handover report from raw timeseries signals.

For pipeline usage with pre-computed DataFrames, use :meth:from_shift_data instead.

Parameters:

Name Type Description Default
counter_uuid str

UUID of production counter.

required
ok_counter_uuid str

UUID of good parts counter.

required
nok_counter_uuid str

UUID of defective parts counter.

required
state_uuid str

UUID of machine state signal.

required
targets Optional[Dict[str, float]]

Per-shift production targets.

None
quality_target_pct float

Quality target percentage.

98.0
availability_target_pct float

Availability target percentage.

90.0
running_value str

Value indicating machine is running.

'Running'
value_column_counter str

Column for counter values.

'value_integer'
value_column_state str

Column for state values.

'value_string'
report_date Optional[str]

Specific date (YYYY-MM-DD). If None, uses latest date.

None

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame

date, shift, production, production_target, production_achievement_pct,

DataFrame

ok_parts, nok_parts, quality_pct, availability_pct, downtime_minutes

highlight_issues ¤

highlight_issues(
    counter_uuid: Optional[str] = None,
    ok_counter_uuid: Optional[str] = None,
    nok_counter_uuid: Optional[str] = None,
    state_uuid: Optional[str] = None,
    *,
    report_df: Optional[DataFrame] = None,
    thresholds: Optional[Dict[str, float]] = None,
    targets: Optional[Dict[str, float]] = None,
    running_value: str = "Running",
    value_column_counter: str = "value_integer",
    value_column_state: str = "value_string",
    report_date: Optional[str] = None
) -> List[Dict[str, str]]

Identify issues that need attention.

Can be called in two ways:

  1. From raw signals (provide UUIDs): highlight_issues('prod', 'ok', 'nok', 'state', thresholds={...})

  2. From a pre-built report DataFrame: highlight_issues(report_df=my_report, thresholds={...})

Parameters:

Name Type Description Default
counter_uuid Optional[str]

UUID of production counter (raw-signal mode).

None
ok_counter_uuid Optional[str]

UUID of good parts counter (raw-signal mode).

None
nok_counter_uuid Optional[str]

UUID of defective parts counter (raw-signal mode).

None
state_uuid Optional[str]

UUID of machine state signal (raw-signal mode).

None
report_df Optional[DataFrame]

Pre-computed report DataFrame (pipeline mode). If provided, UUID arguments are ignored.

None
thresholds Optional[Dict[str, float]]

Minimum acceptable values for each metric. Defaults to production_achievement_pct=95, quality_pct=98, availability_pct=90.

None
targets Optional[Dict[str, float]]

Per-shift production targets.

None
running_value str

Value indicating machine is running.

'Running'
value_column_counter str

Column for counter values.

'value_integer'
value_column_state str

Column for state values.

'value_string'
report_date Optional[str]

Specific date (YYYY-MM-DD).

None

Returns:

Type Description
List[Dict[str, str]]

List of dicts with keys: shift, metric, value, threshold, severity.

List[Dict[str, str]]

severity is 'warning' (within 5% of threshold) or 'critical'.

PeriodSummary ¤

PeriodSummary(
    dataframe: DataFrame, *, time_column: str = "systime"
)

Bases: Base

Aggregate daily metrics into weekly/monthly summaries.

Merge keys: [week_start, week_end] for weekly, [year, month] for monthly.

Two usage patterns:

Pattern A — from raw signals:

summary = PeriodSummary(df)
weekly = summary.weekly_summary(counter_uuid='prod')

Pattern B — pipeline (compose from upstream modules):

# Build a daily DataFrame from any combination of upstream modules
quality = QualityTracking(df).daily_quality_summary('ok', 'nok')
downtime = DowntimeTracking(df).availability_trend('state', window='1D')

# Merge on date
daily = quality.merge(downtime.rename(columns={'period': 'date'}), on='date')

# Roll up to weekly/monthly
weekly = PeriodSummary.from_daily_data(daily, freq='W')
monthly = PeriodSummary.from_daily_data(daily, freq='MS')

from_daily_data staticmethod ¤

from_daily_data(
    daily_df: DataFrame,
    *,
    freq: str = "W",
    date_column: str = "date"
) -> pd.DataFrame

Roll up a pre-computed daily DataFrame to weekly or monthly.

This is the pipeline-friendly entry-point. It accepts any daily DataFrame (e.g. merged output of QualityTracking, DowntimeTracking, OEECalculator) and aggregates numeric columns.

Numeric columns are aggregated as follows: - Columns containing _pct or pct: averaged (mean). - Columns containing parts, quantity, production, minutes: summed. - All other numeric columns: summed.

The output always includes period_start, period_end, and days columns.

Parameters:

Name Type Description Default
daily_df DataFrame

DataFrame with a date column and numeric metric columns. Expected merge key: [date].

required
freq str

Pandas frequency string. 'W' = weekly (Monday start), 'MS' = monthly.

'W'
date_column str

Name of the date column.

'date'

Returns:

Type Description
DataFrame

DataFrame with period_start, period_end, days, and aggregated

DataFrame

metric columns.

Example::

quality = QualityTracking(df).daily_quality_summary('ok', 'nok')
weekly = PeriodSummary.from_daily_data(quality, freq='W')
# → [period_start, period_end, days, ok_parts, nok_parts,
#    total_parts, quality_pct, ...]

weekly_summary ¤

weekly_summary(
    counter_uuid: str,
    ok_counter_uuid: Optional[str] = None,
    nok_counter_uuid: Optional[str] = None,
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Roll up daily production to weekly summaries from raw signals.

For pipeline usage with pre-computed DataFrames, use :meth:from_daily_data instead.

Parameters:

Name Type Description Default
counter_uuid str

UUID of production counter.

required
ok_counter_uuid Optional[str]

UUID of good parts counter (optional).

None
nok_counter_uuid Optional[str]

UUID of defective parts counter (optional).

None
value_column str

Column containing counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • week_start, week_end, total_production, daily_avg, production_days, [ok_parts, nok_parts, quality_pct]

monthly_summary ¤

monthly_summary(
    counter_uuid: str,
    ok_counter_uuid: Optional[str] = None,
    nok_counter_uuid: Optional[str] = None,
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Roll up daily production to monthly summaries from raw signals.

For pipeline usage with pre-computed DataFrames, use :meth:from_daily_data instead.

Parameters:

Name Type Description Default
counter_uuid str

UUID of production counter.

required
ok_counter_uuid Optional[str]

UUID of good parts counter (optional).

None
nok_counter_uuid Optional[str]

UUID of defective parts counter (optional).

None
value_column str

Column containing counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • year, month, total_production, daily_avg, production_days, [ok_parts, nok_parts, quality_pct]

compare_periods ¤

compare_periods(
    counter_uuid: str,
    period1: tuple[str, str],
    period2: tuple[str, str],
    *,
    value_column: str = "value_integer"
) -> pd.DataFrame

Compare production between two time periods.

Parameters:

Name Type Description Default
counter_uuid str

UUID of production counter.

required
period1 tuple[str, str]

Tuple of (start_date, end_date) for first period.

required
period2 tuple[str, str]

Tuple of (start_date, end_date) for second period.

required
value_column str

Column containing counter values.

'value_integer'

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • metric, period1_value, period2_value, change, change_pct