Reactive Computation Graphs State Management for Python with first-class async support, inspired by Angular's reactivity model.
Reactive Computation Graphs for Python with first-class async support, inspired by Angular’s reactivity model.
pip install reaktiv
# or with uv
uv pip install reaktiv
reaktiv
creates efficient reactive computation graphs that only recalculate values when their dependencies change. The system automatically tracks dependencies between signals, computed values, and effects, eliminating the need for manual subscription management.
Key features:
Full documentation is available at https://reaktiv.readthedocs.io/.
from reaktiv import Signal, Computed, Effect
# Create some signals
name = Signal("Alice")
age = Signal(30)
city = Signal("New York")
# Computed value with automatically tracked dependencies
# The system detects that this depends on name and age
greeting = Computed(lambda: f"{name()} is {age()} years old")
# Another computed value with different dependencies
# The system detects this depends only on name and city
location = Computed(lambda: f"{name()} lives in {city()}")
# Create effects to demonstrate updates
print("Initial Setup:")
greeting_effect = Effect(lambda: print(f"Greeting: {greeting()}"))
location_effect = Effect(lambda: print(f"Location: {location()}"))
# Changing age only triggers recomputation of greeting
print("\nChanging age to 31:")
age.set(31)
# Only greeting recomputed (location unaffected)
# Changing city only triggers recomputation of location
print("\nChanging city to Boston:")
city.set("Boston")
# Only location recomputed (greeting unaffected)
# Changing name triggers recomputation of both derived values
print("\nChanging name to Bob:")
name.set("Bob")
# Both greeting and location recomputed
update()
Instead of calling set(new_value)
, update()
lets you modify a signal based on its current value.
from reaktiv import Signal
counter = Signal(0)
# Standard way
counter.set(counter() + 1)
# Using update() for cleaner syntax
counter.update(lambda x: x + 1)
print(counter()) # 2
from reaktiv import Signal, Computed
# Synchronous context example
price = Signal(100)
tax_rate = Signal(0.2)
total = Computed(lambda: price() * (1 + tax_rate()))
print(total()) # 120.0
tax_rate.set(0.25)
print(total()) # 125.0
graph TD
%% Define node subgraphs for better organization
subgraph "Data Sources"
S1[Signal A]
S2[Signal B]
S3[Signal C]
end
subgraph "Derived Values"
C1[Computed X]
C2[Computed Y]
end
subgraph "Side Effects"
E1[Effect 1]
E2[Effect 2]
end
subgraph "External Systems"
EXT1[UI Update]
EXT2[API Call]
EXT3[Database Write]
end
%% Define relationships between nodes
S1 -->|"get()"| C1
S2 -->|"get()"| C1
S2 -->|"get()"| C2
S3 -->|"get()"| C2
C1 -->|"get()"| E1
C2 -->|"get()"| E1
S3 -->|"get()"| E2
C2 -->|"get()"| E2
E1 --> EXT1
E1 --> EXT2
E2 --> EXT3
%% Change propagation path
S1 -.-> |"1\. set()"| C1
C1 -.->|"2\. recompute"| E1
E1 -.->|"3\. execute"| EXT1
%% Style nodes by type
classDef signal fill:#4CAF50,color:white,stroke:#388E3C,stroke-width:1px
classDef computed fill:#2196F3,color:white,stroke:#1976D2,stroke-width:1px
classDef effect fill:#FF9800,color:white,stroke:#F57C00,stroke-width:1px
%% Apply styles to nodes
class S1,S2,S3 signal
class C1,C2 computed
class E1,E2 effect
%% Legend node
LEGEND[" Legend:
• Signal: Stores a value, notifies dependents
• Computed: Derives value from dependencies
• Effect: Runs side effects when dependencies change
• → Data flow / Dependency (read)
• ⟿ Change propagation (update)
"]
classDef legend fill:none,stroke:none,text-align:left
class LEGEND legend
reaktiv
provides three core primitives:
If you’ve worked with modern frontend frameworks like React, Vue, SolidJS or Angular, you’re familiar with the power of reactive state management. The idea is simple but transformative: when data changes, everything that depends on it updates automatically.
While this pattern revolutionized frontend development, its benefits are equally powerful in backend systems where complex state management is often overlooked or implemented with brittle, ad-hoc solutions.
reaktiv
brings these reactive programming advantages to your Python backend projects:
asyncio
for managing real-time data flowsreaktiv
addresses key challenges in backend state management:
Even in “stateless” architectures, ephemeral state still exists during request processing. reaktiv
helps manage this complexity without the boilerplate of observers, callbacks, or event dispatchers.
Many backend developers view reactive libraries as just another pub/sub system and question their value in “stateless” architectures. However, reaktiv
addresses fundamentally different problems:
Pub/Sub Systems | Reaktiv |
---|---|
Message delivery between components | Automatic state dependency tracking |
Point-to-point or broadcast messaging | Fine-grained computation graphs |
Manual subscription management | Automatic dependency detection |
Focus on message transport | Focus on state derivation |
Stateless by design | Intentional state management |
Even in “stateless” microservices and serverless functions, state exists during request processing:
reaktiv
helps manage this ephemeral state with less code, fewer bugs, and better maintainability.
Here are some simple examples to help you understand reaktiv’s benefits:
from reaktiv import Signal, Computed, Effect
# Create base values (signals)
price = Signal(10.0)
quantity = Signal(2)
tax_rate = Signal(0.1) # 10% tax
# Create derived values (computed)
subtotal = Computed(lambda: price() * quantity())
tax = Computed(lambda: subtotal() * tax_rate())
total = Computed(lambda: subtotal() + tax())
# Create a side effect for logging
logger = Effect(lambda: print(f"Order Total: ${total():.2f}"))
# Initial output: "Order Total: $22.00"
# Change the quantity
quantity.set(3)
# Automatically logs: "Order Total: $33.00"
# Change the price
price.set(12.0)
# Automatically logs: "Order Total: $39.60"
# Change tax rate
tax_rate.set(0.15)
# Automatically logs: "Order Total: $41.40"
from reaktiv import Signal, Computed, Effect
# Base signals: system metrics
memory_usage = Signal(75) # percent
cpu_usage = Signal(50) # percent
# Computed value: system status based on thresholds
system_status = Computed(lambda:
"critical" if memory_usage() > 90 or cpu_usage() > 90 else
"warning" if memory_usage() > 70 or cpu_usage() > 70 else
"normal"
)
# Effect: alert when system status changes
def alert_on_status():
status = system_status()
print(f"System status: {status}")
if status != "normal":
print(f" Memory: {memory_usage()}%, CPU: {cpu_usage()}%")
status_monitor = Effect(alert_on_status)
# Initial output: "System status: warning"
# " Memory: 75%, CPU: 50%"
# Simulate memory dropping
memory_usage.set(60)
# Output: "System status: normal"
# Simulate CPU spiking
cpu_usage.set(95)
# Output: "System status: critical"
# " Memory: 60%, CPU: 95%"
from reaktiv import Signal, Computed, Effect
import asyncio
async def demo_async_monitoring():
# Base signals: system metrics
memory_usage = Signal(75) # percent
cpu_usage = Signal(50) # percent
# Computed value: system status
system_status = Computed(lambda:
"critical" if memory_usage() > 90 or cpu_usage() > 90 else
"warning" if memory_usage() > 70 or cpu_usage() > 70 else
"normal"
)
# Async effect: alert and take action when status changes
async def async_alert_handler():
status = system_status()
print(f"System status: {status}")
if status == "critical":
print(f" Memory: {memory_usage()}%, CPU: {cpu_usage()}%")
# Simulate sending notification
await asyncio.sleep(0.1) # non-blocking wait
print(" ✉️ Alert notification sent")
# Simulate recovery action
if cpu_usage() > 90:
await asyncio.sleep(0.2) # simulating some async operation
print(" 🔄 Triggered automatic scaling")
# Register the async effect
status_monitor = Effect(async_alert_handler)
print("Starting monitoring...")
# Give effect time to run initially
await asyncio.sleep(0.2)
# Simulate CPU spike
print("\nSimulating CPU spike:")
cpu_usage.set(95)
await asyncio.sleep(0.5) # Allow effect to complete
# Simulate recovery
print("\nSimulating recovery:")
cpu_usage.set(50)
memory_usage.set(60)
await asyncio.sleep(0.2)
asyncio.run(demo_async_monitoring())
from reaktiv import Signal, Computed, Effect
# Different configuration sources
default_config = Signal({"timeout": 30, "retries": 3, "log_level": "INFO"})
user_config = Signal({})
# Effective config combines both sources, with user settings taking precedence
effective_config = Computed(lambda: {**default_config(), **user_config()})
# Log when config changes
config_logger = Effect(lambda: print(f"Active config: {effective_config()}"))
# Initial output: "Active config: {'timeout': 30, 'retries': 3, 'log_level': 'INFO'}"
# When user updates their preferences
user_config.set({"timeout": 60, "log_level": "DEBUG"})
# Automatically logs: "Active config: {'timeout': 60, 'retries': 3, 'log_level': 'DEBUG'}"
# When defaults are updated (e.g., from a config file)
default_config.update(lambda cfg: {**cfg, "retries": 5, "max_connections": 100})
# Automatically logs: "Active config: {'timeout': 60, 'retries': 5, 'log_level': 'DEBUG', 'max_connections': 100}"
from reaktiv import Signal, Computed
# Database simulation
database = {"user1": "Alice", "user2": "Bob"}
# Cache invalidation signal
last_update = Signal(0) # timestamp or version number
# User data with automatic cache refresh
def fetch_user(user_id):
# In a real app, this would actually query a database
return database.get(user_id)
active_user_id = Signal("user1")
# This computed value automatically refreshes when active_user_id changes
# or when the cache is invalidated via last_update
user_data = Computed(lambda: {
"id": active_user_id(),
"name": fetch_user(active_user_id()),
"cache_version": last_update()
})
print(user_data()) # {'id': 'user1', 'name': 'Alice', 'cache_version': 0}
# Switch to another user - cache updates automatically
active_user_id.set("user2")
print(user_data()) # {'id': 'user2', 'name': 'Bob', 'cache_version': 0}
# Backend data changes - we update the database and invalidate cache
database["user2"] = "Robert"
last_update.set(1) # increment version number
print(user_data()) # {'id': 'user2', 'name': 'Robert', 'cache_version': 1}
from reaktiv import Signal, Computed, Effect
import time
# Event stream (simulating incoming data)
events = Signal([])
# Compute statistics from the events
event_count = Computed(lambda: len(events()))
recent_events = Computed(lambda: [e for e in events() if time.time() - e["timestamp"] < 60])
error_count = Computed(lambda: len([e for e in events() if e["level"] == "ERROR"]))
# Monitor for problems
def check_errors():
if error_count() >= 3:
print(f"ALERT: {error_count()} errors detected in the event stream!")
error_monitor = Effect(check_errors)
# Process new events
def add_event(level, message):
new_event = {"timestamp": time.time(), "level": level, "message": message}
events.update(lambda current: current + [new_event])
# Simulate incoming events
add_event("INFO", "Application started")
add_event("INFO", "Processing request")
add_event("ERROR", "Database connection failed")
add_event("ERROR", "Retry failed")
add_event("ERROR", "Giving up after 3 attempts")
# Automatically triggers: "ALERT: 3 errors detected in the event stream!"
from reaktiv import Signal, Computed, Effect
import asyncio
import time
# Demo async event processing
async def demo_async_events():
# Event stream
events = Signal([])
# Derived statistics
error_count = Computed(lambda: len([e for e in events() if e["level"] == "ERROR"]))
warning_count = Computed(lambda: len([e for e in events() if e["level"] == "WARNING"]))
# Async effect for error handling
async def handle_errors():
errors = error_count()
if errors > 0:
print(f"Found {errors} errors")
# Simulate error processing that takes time
await asyncio.sleep(0.2)
if errors >= 3:
print("🚨 Critical error threshold reached")
await asyncio.sleep(0.1) # Simulate sending alert
print("✉️ Error notification dispatched to on-call team")
# Register async effect
error_handler = Effect(handle_errors)
# Function to add events
async def add_event_async(level, message):
new_event = {"timestamp": time.time(), "level": level, "message": message}
events.update(lambda current: current + [new_event])
# Simulate some processing time
await asyncio.sleep(0.1)
print("Starting event monitoring...")
# Add some events
await add_event_async("INFO", "Application started")
await add_event_async("INFO", "User logged in")
await add_event_async("WARNING", "Slow database query detected")
# Add error events that will trigger our effect
print("\nSimulating error condition:")
await add_event_async("ERROR", "Database connection timeout")
await add_event_async("ERROR", "Retry failed")
await add_event_async("ERROR", "Service unavailable")
# Give effect time to complete
await asyncio.sleep(0.5)
print(f"\nFinal status: {error_count()} errors, {warning_count()} warnings")
asyncio.run(demo_async_events())
Effects must be retained (assigned to a variable) to prevent garbage collection. If you create an Effect without assigning it to a variable, it may be immediately garbage collected:
# INCORRECT: Effect will be garbage collected immediately
Effect(lambda: print(f"Value changed: {my_signal()}"))
# CORRECT: Effect is retained
my_effect = Effect(lambda: print(f"Value changed: {my_signal()}"))
When using Effects in classes, assign them to instance attributes in the constructor:
class MyComponent:
def __init__(self):
self._counter = Signal(0)
def _handle_counter_change():
print(f"Counter changed: {self._counter()}")
# Assign Effect to self to prevent garbage collection
self._effect = Effect(_handle_counter_change)
By default, reaktiv uses identity comparison (is
) to detect value changes. This means that mutable objects like lists, dictionaries, or custom objects with the same content but different identity will be treated as different values:
items = Signal([1, 2, 3])
# This WILL trigger updates because it's a different list instance
items.set([1, 2, 3])
# Modifying the list in-place WON'T trigger updates
current = items()
current.append(4) # Signal doesn't detect this change
For collection types, you can provide a custom equality function:
def list_equal(a, b):
return len(a) == len(b) and all(a_item == b_item for a_item, b_item in zip(a, b))
items = Signal([1, 2, 3], equal=list_equal)
# With custom equality, this WON'T trigger updates (values are equal)
items.set([1, 2, 3])
# This WILL trigger updates (values are different)
items.set([1, 2, 3, 4])
You can find more example scripts in the examples folder to help you get started with using this project.
Inspired by Angular Signals and TC39 Signals Proposal • Built for Python’s async-first world and backend uses • Made in Hamburg