Introduction
Decentralized systems eliminate central points of failure and enable trustless data sharing. This article covers IPFS, distributed storage, and decentralized web technologies.
Key Statistics:
- IPFS nodes: 100K+ active
- Filecoin storage: 10+ EB secured
- DWeb market: Growing rapidly
- Decentralized storage: 80% cheaper than cloud
Decentralized Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Decentralized Storage Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Traditional (Centralized) โ
โ โโโ Single point of failure โ
โ โโโ Vendor lock-in โ
โ โโโ Slow at scale โ
โ โโโ Expensive at scale โ
โ โ
โ Decentralized โ
โ โโโ No single point of failure โ
โ โโโ Vendor neutral โ
โ โโโ Fast via caching nearby โ
โ โโโ Cheaper at scale โ
โ โ
โ Key Technologies โ
โ โโโ Content addressing (CID) โ
โ โโโ Merkle DAG (Directed Acyclic Graph) โ
โ โโโ DHT (Distributed Hash Table) โ
โ โโโ P2P protocols (Bitswap) โ
โ โโโ Cryptographic verification โ
โ โ
โ Storage Layers โ
โ โโโ Hot: IPFS, Swarm โ
โ โโโ Warm: Filecoin, Arweave โ
โ โโโ Cold: Crust, Sia โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
IPFS Implementation
#!/usr/bin/env python3
"""IPFS interaction with Python."""
import ipfshttpclient
import hashlib
import json
from pathlib import Path
class IPFSManager:
"""Manage IPFS operations."""
def __init__(self, api_endpoint: str = "/ip4/127.0.0.1/tcp/5001"):
try:
self.client = ipfshttpclient.connect(api_endpoint)
except Exception as e:
print(f"IPFS not available: {e}")
self.client = None
def add_file(self, file_path: str) -> dict:
"""Add file to IPFS."""
if not self.client:
return {"error": "IPFS not connected"}
result = self.client.add(file_path)
return {
"cid": result["Hash"],
"name": result["Name"],
"size": result["Size"]
}
def add_directory(self, dir_path: str) -> dict:
"""Add directory recursively."""
if not self.client:
return {"error": "IPFS not connected"}
result = self.client.add(dir_path, recursive=True)
return {
"cid": result["Hash"],
"name": result["Name"],
"size": result["Size"]
}
def cat_file(self, cid: str) -> bytes:
"""Get file content from IPFS."""
if not self.client:
return b""
return self.client.cat(cid)
def pin_file(self, cid: str) -> bool:
"""Pin file to prevent garbage collection."""
if not self.client:
return False
self.client.pin.add(cid)
return True
def get_stats(self, cid: str) -> dict:
"""Get file statistics."""
if not self.client:
return {}
stat = self.client.files.stat(f"/ipfs/{cid}")
return {
"size": stat.get("Size"),
"cumulative_size": stat.get("CumulativeSize"),
"blocks": stat.get("Blocks"),
"type": stat.get("Type")
}
def list_pins(self) -> list:
"""List pinned files."""
if not self.client:
return []
pins = []
for pin in self.client.pin.ls():
pins.append({
"cid": pin["Cid"],
"type": pin["Type"]
})
return pins
def create_ipns_record(self, cid: str, key_name: str = "self") -> str:
"""Create IPNS (InterPlanetary Name System) record."""
if not self.client:
return ""
# Publish CID to IPNS
result = self.client.name.publish(cid, key=key_name)
return result["Name"]
class ContentAddressing:
"""Content addressing with CIDs."""
@staticmethod
def calculate_cid_v0(data: bytes) -> str:
"""Calculate CIDv0 (base58 multihash)."""
import base58
# SHA-256 hash
digest = hashlib.sha256(data).digest()
# Create multihash: <hash-func-id><digest-length><digest>
multihash = bytes([0x12, 0x20]) + digest # sha2-256 + 32 bytes
# Base58 encode
cid_v0 = base58.b58encode(multihash).decode()
return cid_v0
@staticmethod
def calculate_cid_v1(data: bytes) -> str:
"""Calculate CIDv1 (multibase + multicodec)."""
import multiformats
# Use multiformats library
cid = multiformats.cid.CIDv1(
version=1,
codec='raw',
mhcode='sha2-256',
mhlen=32,
digest=hashlib.sha256(data).digest()
)
return str(cid)
@staticmethod
def parse_cid(cid_string: str) -> dict:
"""Parse CID and extract components."""
# Use py-multiformats
try:
import multiformats
cid = multiformats.cid.CID.decode(cid_string)
return {
"version": cid.version,
"codec": cid.codec,
"mh_code": cid.mhcode,
"mh_length": cid.mhlen
}
except:
return {}
def create_merkle_dag():
"""Create Merkle DAG structure."""
import ipfshttpclient
client = ipfshttpclient.connect()
# Create nested structure
# File structure:
# root/
# โโโ dir1/
# โ โโโ file1.txt
# โโโ dir2/
# โโโ file2.txt
# Add files first
file1 = client.add("file1.txt")
file2 = client.add("file2.txt")
# Create directory structure
root_cid = client.files.mkdir("/root/dir1")
client.files.cp(file1["Hash"], "/root/dir1/file1.txt")
client.files.mkdir("/root/dir2")
client.files.cp(file2["Hash"], "/root/dir2/file2.txt")
# Get root CID
stat = client.files.stat("/root")
print(f"Root CID: {stat['Cid']}")
Filecoin Storage
#!/usr/bin/env python3
"""Filecoin storage integration."""
from lotus import LotusClient
from pathlib import Path
from typing import Dict, List
import time
class FilecoinStorage:
"""Manage Filecoin storage deals."""
def __init__(self, api_token: str, api_endpoint: str):
self.client = LotusClient(api_endpoint, api_token)
def create_deal(self, cid: str,
duration: int = 180) -> Dict:
"""Create storage deal."""
# Find available miners
miners = self.client.state.list_miners()
# Filter by reputation/size
selected_miner = self._select_miner(miners)
# Create deal proposal
deal = self.client.client.deal(
cid=cid,
miner=selected_miner,
price="0.000000001", # AttoFIL per epoch
duration=duration, # Days
)
return {
"deal_id": deal["DealID"],
"miner": selected_miner,
"status": deal["State"]
}
def _select_miner(self, miners: List[str]) -> str:
"""Select best miner based on criteria."""
best_miner = None
best_score = 0
for miner in miners[:10]: # Check top 10
try:
info = self.client.state.miner_info(miner)
score = self._calculate_miner_score(info)
if score > best_score:
best_score = score
best_miner = miner
except:
continue
return best_miner or miners[0]
def _calculate_miner_score(self, info: Dict) -> float:
"""Calculate miner reputation score."""
score = 0
# Power (storage capacity)
score += info.get("Power", 0) * 0.5
# Reputation (number of successful deals)
score += info.get("SuccessfulDeals", 0) * 0.3
# Age (time in network)
score += min(info.get("Age", 0) / 365, 5) * 0.2
return score
def retrieve_data(self, cid: str, output_path: str):
"""Retrieve data from Filecoin."""
# Create retrieval deal
deal = self.client.client.retrieve(cid)
# Write to file
with open(output_path, 'wb') as f:
for chunk in deal:
f.write(chunk)
def get_deal_status(self, deal_id: int) -> Dict:
"""Get deal status."""
deal = self.client.client.get_deal(deal_id)
return {
"deal_id": deal_id,
"state": deal["State"],
"provider": deal["Provider"],
"piece_cid": deal["PieceCID"],
"size": deal["Size"],
"price": deal["Price"],
"duration": deal["Duration"],
"start_epoch": deal["StartEpoch"],
}
class StoragePricing:
"""Calculate Filecoin storage costs."""
@staticmethod
def estimate_cost(size_gb: float,
duration_days: int) -> Dict:
"""Estimate storage costs."""
# Average price (attoFIL per GiB per epoch)
avg_price_per_epoch = 0.0000001 # 0.0000001 FIL/GiB/epoch
epochs_per_day = 2880 # ~5 minutes per epoch
total_epochs = duration_days * epochs_per_epoch
# Calculate cost
cost = size_gb * avg_price_per_epoch * total_epochs
# Convert to FIL and USD (assuming $5/FIL)
cost_fil = cost
cost_usd = cost_fil * 5
return {
"size_gb": size_gb,
"duration_days": duration_days,
"cost_fil": cost_fil,
"cost_usd": cost_usd,
"per_month_usd": cost_usd / (duration_days / 30)
}
Decentralized Database
#!/usr/bin/env python3
"""OrbitDB - Decentralized database."""
from orbitdb import OrbitDB
import asyncio
class DecentralizedDatabase:
"""Manage OrbitDB decentralized database."""
def __init__(self, ipfs_node):
self.db = None
async def connect(self, database_name: str):
"""Connect to OrbitDB."""
# Create OrbitDB instance
orbitdb = await OrbitDB.create_instance(ipfs_node)
# Create different types of databases
# Key-Value store
self.kv_db = await orbitdb.kvstore(database_name)
# Document store
self.doc_db = await orbitdb.docstore(f"{database_name}_docs")
# Feed (append-only log)
self.feed_db = await orbitdb.feed(f"{database_name}_feed")
# Counter
self.counter_db = await orbitdb.counter(f"{database_name}_counter")
async def put_key_value(self, key: str, value):
"""Store key-value."""
await self.kv_db.put(key, value)
async def get_key_value(self, key: str):
"""Retrieve key-value."""
return self.kv_db.get(key)
async def put_document(self, doc: dict):
"""Store document."""
await self.doc_db.put(doc)
async def query_documents(self, query: dict):
"""Query documents."""
return self.doc_db.query(lambda doc:
all(doc.get(k) == v for k, v in query.items())
)
async def append_feed(self, data):
"""Append to feed."""
await self.feed_db.add(data)
async def get_all_feed(self):
"""Get all feed entries."""
return list(self.feed_db.iterator())
async def increment_counter(self):
"""Increment counter."""
await self.counter_db.inc()
async def get_counter(self):
"""Get counter value."""
return self.counter_db.value
class CRDTDataStructure:
"""CRDT (Conflict-free Replicated Data Types)."""
# G-Counter (Grow-only counter)
class GCounter:
def __init__(self, node_id):
self.node_id = node_id
self.counts = {}
def increment(self):
self.counts[self.node_id] = \
self.counts.get(self.node_id, 0) + 1
def value(self):
return sum(self.counts.values())
def merge(self, other):
for node_id, count in other.counts.items():
self.counts[node_id] = max(
self.counts.get(node_id, 0),
count
)
# LWW-Register (Last-Writer-Wins)
class LWWRegister:
def __init__(self):
self.value = None
self.timestamp = 0
self.node_id = None
def set(self, value, node_id):
import time
ts = time.time()
if ts > self.timestamp:
self.value = value
self.timestamp = ts
self.node_id = node_id
def get(self):
return self.value
# OR-Set (Observed-Remove Set)
class ORSet:
def __init__(self):
self.elements = {} # element -> set of node_ids
def add(self, element, node_id):
if element not in self.elements:
self.elements[element] = set()
self.elements[element].add(node_id)
def remove(self, element, node_id):
if element in self.elements:
self.elements[element].discard(node_id)
if not self.elements[element]:
del self.elements[element]
def get(self):
return set(self.elements.keys())
P2P Protocols
#!/usrBitswap - IPFS's data transfer protocol
class BitswapMessage:
"""Bitswap message structure."""
def __init__(self):
self.want_list = []
self.blocks = []
self.pending = []
self.full = False
def add_want(self, cid: str, priority: int = 1):
"""Add block to want list."""
self.want_list.append({
"cid": cid,
"priority": priority,
"cancel": False,
"send_dont_have": True
})
def add_block(self, cid: str, data: bytes):
"""Add block to message."""
self.blocks.append({
"cid": cid,
"data": data,
"metadata": {
"block_size": len(data),
"checksum": self._calculate_checksum(data)
}
})
def _calculate_checksum(self, data: bytes) -> bytes:
"""Calculate block checksum."""
import hashlib
return hashlib.sha256(data).digest()
def serialize(self) -> bytes:
"""Serialize message for transmission."""
import protobuf
message = bitswap_pb2.Message()
for want in self.want_list:
entry = message.wantlist.add()
entry.block = want["cid"]
entry.priority = want["priority"]
entry.cancel = want["cancel"]
for block in self.blocks:
data = message.blocks.add()
data.prefix = block["cid"][:8] # CID prefix
data.data = block["data"]
return message.SerializeToString()
class DHTClient:
"""Distributed Hash Table client."""
# Kademlia DHT
K = 20 # Size of bucket
ALPHA = 3 # Parallel queries
def __init__(self, node_id: str):
self.node_id = node_id
self.routing_table = {}
self.local_store = {}
def find_peers(self, key: str) -> List[str]:
"""Find peers closest to key."""
# Get closest nodes from routing table
closest = self._get_closest(key, self.K)
# Query in parallel (alpha)
queried = set()
to_query = closest[:self.ALPHA]
while to_query:
node = to_query.pop(0)
if node in queried:
continue
queried.add(node)
# Query node for closer peers
nearer = self._query_peer(node, key)
# Add closer peers to query
for p in nearer:
if p not in queried:
to_query.append(p)
return list(queried)
def _get_closest(self, key: str, count: int) -> List[str]:
"""Get closest nodes to key."""
distances = []
for node_id in self.routing_table.keys():
dist = self._xor_distance(key, node_id)
distances.append((dist, node_id))
distances.sort()
return [n for _, n in distances[:count]]
def _xor_distance(self, key1: str, key2: str) -> int:
"""Calculate XOR distance between keys."""
import intset
k1 = intset.from_string(key1)
k2 = intset.from_string(key2)
return k1 ^ k2
def _query_peer(self, node: str, key: str) -> List[str]:
"""Query peer for closer nodes."""
# In practice: network call to peer
return []
def put(self, key: str, value: bytes):
"""Store value in DHT."""
# Find peers responsible for key
peers = self.find_peers(key)
# Store on k-closest peers
for peer in peers[:self.K]:
self._store_on_peer(peer, key, value)
def get(self, key: str) -> bytes:
"""Retrieve value from DHT."""
# Check local first
if key in self.local_store:
return self.local_store[key]
# Find peers
peers = self.find_peers(key)
# Query peers
for peer in peers[:self.ALPHA]:
value = self._get_from_peer(peer, key)
if value:
self.local_store[key] = value
return value
return None
def _store_on_peer(self, peer: str, key: str, value: bytes):
"""Store on peer (network call)."""
pass
def _get_from_peer(self, peer: str, key: str) -> bytes:
"""Get from peer (network call)."""
pass
External Resources
Related Articles
- Edge Computing: Cloudflare Workers, AWS Lambda@Edge
- WebAssembly (WASM): Production Deployment Patterns
- Quantum Computing: Algorithms and Simulators
Comments