Skip to main content
โšก Calmops

Decentralized Systems: IPFS, Distributed Storage, and DWeb

Introduction

Decentralized systems eliminate central points of failure and enable trustless data sharing. This article covers IPFS, distributed storage, and decentralized web technologies.

Key Statistics:

  • IPFS nodes: 100K+ active
  • Filecoin storage: 10+ EB secured
  • DWeb market: Growing rapidly
  • Decentralized storage: 80% cheaper than cloud

Decentralized Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Decentralized Storage Architecture                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Traditional (Centralized)                                       โ”‚
โ”‚  โ”œโ”€โ”€ Single point of failure                                    โ”‚
โ”‚  โ”œโ”€โ”€ Vendor lock-in                                             โ”‚
โ”‚  โ”œโ”€โ”€ Slow at scale                                              โ”‚
โ”‚  โ””โ”€โ”€ Expensive at scale                                         โ”‚
โ”‚                                                                  โ”‚
โ”‚  Decentralized                                                  โ”‚
โ”‚  โ”œโ”€โ”€ No single point of failure                                โ”‚
โ”‚  โ”œโ”€โ”€ Vendor neutral                                             โ”‚
โ”‚  โ”œโ”€โ”€ Fast via caching nearby                                   โ”‚
โ”‚  โ””โ”€โ”€ Cheaper at scale                                           โ”‚
โ”‚                                                                  โ”‚
โ”‚  Key Technologies                                               โ”‚
โ”‚  โ”œโ”€โ”€ Content addressing (CID)                                  โ”‚
โ”‚  โ”œโ”€โ”€ Merkle DAG (Directed Acyclic Graph)                       โ”‚
โ”‚  โ”œโ”€โ”€ DHT (Distributed Hash Table)                              โ”‚
โ”‚  โ”œโ”€โ”€ P2P protocols (Bitswap)                                   โ”‚
โ”‚  โ””โ”€โ”€ Cryptographic verification                                โ”‚
โ”‚                                                                  โ”‚
โ”‚  Storage Layers                                                  โ”‚
โ”‚  โ”œโ”€โ”€ Hot: IPFS, Swarm                                          โ”‚
โ”‚  โ”œโ”€โ”€ Warm: Filecoin, Arweave                                  โ”‚
โ”‚  โ””โ”€โ”€ Cold: Crust, Sia                                          โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

IPFS Implementation

#!/usr/bin/env python3
"""IPFS interaction with Python."""

import ipfshttpclient
import hashlib
import json
from pathlib import Path

class IPFSManager:
    """Manage IPFS operations."""
    
    def __init__(self, api_endpoint: str = "/ip4/127.0.0.1/tcp/5001"):
        try:
            self.client = ipfshttpclient.connect(api_endpoint)
        except Exception as e:
            print(f"IPFS not available: {e}")
            self.client = None
    
    def add_file(self, file_path: str) -> dict:
        """Add file to IPFS."""
        
        if not self.client:
            return {"error": "IPFS not connected"}
        
        result = self.client.add(file_path)
        
        return {
            "cid": result["Hash"],
            "name": result["Name"],
            "size": result["Size"]
        }
    
    def add_directory(self, dir_path: str) -> dict:
        """Add directory recursively."""
        
        if not self.client:
            return {"error": "IPFS not connected"}
        
        result = self.client.add(dir_path, recursive=True)
        
        return {
            "cid": result["Hash"],
            "name": result["Name"],
            "size": result["Size"]
        }
    
    def cat_file(self, cid: str) -> bytes:
        """Get file content from IPFS."""
        
        if not self.client:
            return b""
        
        return self.client.cat(cid)
    
    def pin_file(self, cid: str) -> bool:
        """Pin file to prevent garbage collection."""
        
        if not self.client:
            return False
        
        self.client.pin.add(cid)
        return True
    
    def get_stats(self, cid: str) -> dict:
        """Get file statistics."""
        
        if not self.client:
            return {}
        
        stat = self.client.files.stat(f"/ipfs/{cid}")
        
        return {
            "size": stat.get("Size"),
            "cumulative_size": stat.get("CumulativeSize"),
            "blocks": stat.get("Blocks"),
            "type": stat.get("Type")
        }
    
    def list_pins(self) -> list:
        """List pinned files."""
        
        if not self.client:
            return []
        
        pins = []
        for pin in self.client.pin.ls():
            pins.append({
                "cid": pin["Cid"],
                "type": pin["Type"]
            })
        
        return pins
    
    def create_ipns_record(self, cid: str, key_name: str = "self") -> str:
        """Create IPNS (InterPlanetary Name System) record."""
        
        if not self.client:
            return ""
        
        # Publish CID to IPNS
        result = self.client.name.publish(cid, key=key_name)
        
        return result["Name"]

class ContentAddressing:
    """Content addressing with CIDs."""
    
    @staticmethod
    def calculate_cid_v0(data: bytes) -> str:
        """Calculate CIDv0 (base58 multihash)."""
        
        import base58
        
        # SHA-256 hash
        digest = hashlib.sha256(data).digest()
        
        # Create multihash: <hash-func-id><digest-length><digest>
        multihash = bytes([0x12, 0x20]) + digest  # sha2-256 + 32 bytes
        
        # Base58 encode
        cid_v0 = base58.b58encode(multihash).decode()
        
        return cid_v0
    
    @staticmethod
    def calculate_cid_v1(data: bytes) -> str:
        """Calculate CIDv1 (multibase + multicodec)."""
        
        import multiformats
        
        # Use multiformats library
        cid = multiformats.cid.CIDv1(
            version=1,
            codec='raw',
            mhcode='sha2-256',
            mhlen=32,
            digest=hashlib.sha256(data).digest()
        )
        
        return str(cid)
    
    @staticmethod
    def parse_cid(cid_string: str) -> dict:
        """Parse CID and extract components."""
        
        # Use py-multiformats
        try:
            import multiformats
            cid = multiformats.cid.CID.decode(cid_string)
            
            return {
                "version": cid.version,
                "codec": cid.codec,
                "mh_code": cid.mhcode,
                "mh_length": cid.mhlen
            }
        except:
            return {}

def create_merkle_dag():
    """Create Merkle DAG structure."""
    
    import ipfshttpclient
    
    client = ipfshttpclient.connect()
    
    # Create nested structure
    # File structure:
    # root/
    #   โ”œโ”€โ”€ dir1/
    #   โ”‚   โ””โ”€โ”€ file1.txt
    #   โ””โ”€โ”€ dir2/
    #       โ””โ”€โ”€ file2.txt
    
    # Add files first
    file1 = client.add("file1.txt")
    file2 = client.add("file2.txt")
    
    # Create directory structure
    root_cid = client.files.mkdir("/root/dir1")
    client.files.cp(file1["Hash"], "/root/dir1/file1.txt")
    
    client.files.mkdir("/root/dir2")
    client.files.cp(file2["Hash"], "/root/dir2/file2.txt")
    
    # Get root CID
    stat = client.files.stat("/root")
    print(f"Root CID: {stat['Cid']}")

Filecoin Storage

#!/usr/bin/env python3
"""Filecoin storage integration."""

from lotus import LotusClient
from pathlib import Path
from typing import Dict, List
import time

class FilecoinStorage:
    """Manage Filecoin storage deals."""
    
    def __init__(self, api_token: str, api_endpoint: str):
        self.client = LotusClient(api_endpoint, api_token)
    
    def create_deal(self, cid: str, 
                   duration: int = 180) -> Dict:
        """Create storage deal."""
        
        # Find available miners
        miners = self.client.state.list_miners()
        
        # Filter by reputation/size
        selected_miner = self._select_miner(miners)
        
        # Create deal proposal
        deal = self.client.client.deal(
            cid=cid,
            miner=selected_miner,
            price="0.000000001",  # AttoFIL per epoch
            duration=duration,  # Days
        )
        
        return {
            "deal_id": deal["DealID"],
            "miner": selected_miner,
            "status": deal["State"]
        }
    
    def _select_miner(self, miners: List[str]) -> str:
        """Select best miner based on criteria."""
        
        best_miner = None
        best_score = 0
        
        for miner in miners[:10]:  # Check top 10
            try:
                info = self.client.state.miner_info(miner)
                score = self._calculate_miner_score(info)
                
                if score > best_score:
                    best_score = score
                    best_miner = miner
            except:
                continue
        
        return best_miner or miners[0]
    
    def _calculate_miner_score(self, info: Dict) -> float:
        """Calculate miner reputation score."""
        
        score = 0
        
        # Power (storage capacity)
        score += info.get("Power", 0) * 0.5
        
        # Reputation (number of successful deals)
        score += info.get("SuccessfulDeals", 0) * 0.3
        
        # Age (time in network)
        score += min(info.get("Age", 0) / 365, 5) * 0.2
        
        return score
    
    def retrieve_data(self, cid: str, output_path: str):
        """Retrieve data from Filecoin."""
        
        # Create retrieval deal
        deal = self.client.client.retrieve(cid)
        
        # Write to file
        with open(output_path, 'wb') as f:
            for chunk in deal:
                f.write(chunk)
    
    def get_deal_status(self, deal_id: int) -> Dict:
        """Get deal status."""
        
        deal = self.client.client.get_deal(deal_id)
        
        return {
            "deal_id": deal_id,
            "state": deal["State"],
            "provider": deal["Provider"],
            "piece_cid": deal["PieceCID"],
            "size": deal["Size"],
            "price": deal["Price"],
            "duration": deal["Duration"],
            "start_epoch": deal["StartEpoch"],
        }

class StoragePricing:
    """Calculate Filecoin storage costs."""
    
    @staticmethod
    def estimate_cost(size_gb: float, 
                     duration_days: int) -> Dict:
        """Estimate storage costs."""
        
        # Average price (attoFIL per GiB per epoch)
        avg_price_per_epoch = 0.0000001  # 0.0000001 FIL/GiB/epoch
        
        epochs_per_day = 2880  # ~5 minutes per epoch
        
        total_epochs = duration_days * epochs_per_epoch
        
        # Calculate cost
        cost = size_gb * avg_price_per_epoch * total_epochs
        
        # Convert to FIL and USD (assuming $5/FIL)
        cost_fil = cost
        cost_usd = cost_fil * 5
        
        return {
            "size_gb": size_gb,
            "duration_days": duration_days,
            "cost_fil": cost_fil,
            "cost_usd": cost_usd,
            "per_month_usd": cost_usd / (duration_days / 30)
        }

Decentralized Database

#!/usr/bin/env python3
"""OrbitDB - Decentralized database."""

from orbitdb import OrbitDB
import asyncio

class DecentralizedDatabase:
    """Manage OrbitDB decentralized database."""
    
    def __init__(self, ipfs_node):
        self.db = None
    
    async def connect(self, database_name: str):
        """Connect to OrbitDB."""
        
        # Create OrbitDB instance
        orbitdb = await OrbitDB.create_instance(ipfs_node)
        
        # Create different types of databases
        # Key-Value store
        self.kv_db = await orbitdb.kvstore(database_name)
        
        # Document store
        self.doc_db = await orbitdb.docstore(f"{database_name}_docs")
        
        # Feed (append-only log)
        self.feed_db = await orbitdb.feed(f"{database_name}_feed")
        
        # Counter
        self.counter_db = await orbitdb.counter(f"{database_name}_counter")
    
    async def put_key_value(self, key: str, value):
        """Store key-value."""
        
        await self.kv_db.put(key, value)
    
    async def get_key_value(self, key: str):
        """Retrieve key-value."""
        
        return self.kv_db.get(key)
    
    async def put_document(self, doc: dict):
        """Store document."""
        
        await self.doc_db.put(doc)
    
    async def query_documents(self, query: dict):
        """Query documents."""
        
        return self.doc_db.query(lambda doc: 
            all(doc.get(k) == v for k, v in query.items())
        )
    
    async def append_feed(self, data):
        """Append to feed."""
        
        await self.feed_db.add(data)
    
    async def get_all_feed(self):
        """Get all feed entries."""
        
        return list(self.feed_db.iterator())
    
    async def increment_counter(self):
        """Increment counter."""
        
        await self.counter_db.inc()
    
    async def get_counter(self):
        """Get counter value."""
        
        return self.counter_db.value

class CRDTDataStructure:
    """CRDT (Conflict-free Replicated Data Types)."""
    
    # G-Counter (Grow-only counter)
    class GCounter:
        def __init__(self, node_id):
            self.node_id = node_id
            self.counts = {}
        
        def increment(self):
            self.counts[self.node_id] = \
                self.counts.get(self.node_id, 0) + 1
        
        def value(self):
            return sum(self.counts.values())
        
        def merge(self, other):
            for node_id, count in other.counts.items():
                self.counts[node_id] = max(
                    self.counts.get(node_id, 0), 
                    count
                )
    
    # LWW-Register (Last-Writer-Wins)
    class LWWRegister:
        def __init__(self):
            self.value = None
            self.timestamp = 0
            self.node_id = None
        
        def set(self, value, node_id):
            import time
            ts = time.time()
            
            if ts > self.timestamp:
                self.value = value
                self.timestamp = ts
                self.node_id = node_id
        
        def get(self):
            return self.value
    
    # OR-Set (Observed-Remove Set)
    class ORSet:
        def __init__(self):
            self.elements = {}  # element -> set of node_ids
        
        def add(self, element, node_id):
            if element not in self.elements:
                self.elements[element] = set()
            self.elements[element].add(node_id)
        
        def remove(self, element, node_id):
            if element in self.elements:
                self.elements[element].discard(node_id)
                if not self.elements[element]:
                    del self.elements[element]
        
        def get(self):
            return set(self.elements.keys())

P2P Protocols

#!/usrBitswap - IPFS's data transfer protocol

class BitswapMessage:
    """Bitswap message structure."""
    
    def __init__(self):
        self.want_list = []
        self.blocks = []
        self.pending = []
        self.full = False
    
    def add_want(self, cid: str, priority: int = 1):
        """Add block to want list."""
        
        self.want_list.append({
            "cid": cid,
            "priority": priority,
            "cancel": False,
            "send_dont_have": True
        })
    
    def add_block(self, cid: str, data: bytes):
        """Add block to message."""
        
        self.blocks.append({
            "cid": cid,
            "data": data,
            "metadata": {
                "block_size": len(data),
                "checksum": self._calculate_checksum(data)
            }
        })
    
    def _calculate_checksum(self, data: bytes) -> bytes:
        """Calculate block checksum."""
        
        import hashlib
        return hashlib.sha256(data).digest()
    
    def serialize(self) -> bytes:
        """Serialize message for transmission."""
        
        import protobuf
        
        message = bitswap_pb2.Message()
        
        for want in self.want_list:
            entry = message.wantlist.add()
            entry.block = want["cid"]
            entry.priority = want["priority"]
            entry.cancel = want["cancel"]
        
        for block in self.blocks:
            data = message.blocks.add()
            data.prefix = block["cid"][:8]  # CID prefix
            data.data = block["data"]
        
        return message.SerializeToString()

class DHTClient:
    """Distributed Hash Table client."""
    
    # Kademlia DHT
    K = 20  # Size of bucket
    ALPHA = 3  # Parallel queries
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.routing_table = {}
        self.local_store = {}
    
    def find_peers(self, key: str) -> List[str]:
        """Find peers closest to key."""
        
        # Get closest nodes from routing table
        closest = self._get_closest(key, self.K)
        
        # Query in parallel (alpha)
        queried = set()
        to_query = closest[:self.ALPHA]
        
        while to_query:
            node = to_query.pop(0)
            if node in queried:
                continue
            
            queried.add(node)
            
            # Query node for closer peers
            nearer = self._query_peer(node, key)
            
            # Add closer peers to query
            for p in nearer:
                if p not in queried:
                    to_query.append(p)
        
        return list(queried)
    
    def _get_closest(self, key: str, count: int) -> List[str]:
        """Get closest nodes to key."""
        
        distances = []
        
        for node_id in self.routing_table.keys():
            dist = self._xor_distance(key, node_id)
            distances.append((dist, node_id))
        
        distances.sort()
        return [n for _, n in distances[:count]]
    
    def _xor_distance(self, key1: str, key2: str) -> int:
        """Calculate XOR distance between keys."""
        
        import intset
        
        k1 = intset.from_string(key1)
        k2 = intset.from_string(key2)
        
        return k1 ^ k2
    
    def _query_peer(self, node: str, key: str) -> List[str]:
        """Query peer for closer nodes."""
        
        # In practice: network call to peer
        return []
    
    def put(self, key: str, value: bytes):
        """Store value in DHT."""
        
        # Find peers responsible for key
        peers = self.find_peers(key)
        
        # Store on k-closest peers
        for peer in peers[:self.K]:
            self._store_on_peer(peer, key, value)
    
    def get(self, key: str) -> bytes:
        """Retrieve value from DHT."""
        
        # Check local first
        if key in self.local_store:
            return self.local_store[key]
        
        # Find peers
        peers = self.find_peers(key)
        
        # Query peers
        for peer in peers[:self.ALPHA]:
            value = self._get_from_peer(peer, key)
            if value:
                self.local_store[key] = value
                return value
        
        return None
    
    def _store_on_peer(self, peer: str, key: str, value: bytes):
        """Store on peer (network call)."""
        pass
    
    def _get_from_peer(self, peer: str, key: str) -> bytes:
        """Get from peer (network call)."""
        pass

External Resources


Comments