Synchronization Architecture
Overview
The synchronization architecture in cpm enables efficient repository distribution across multiple servers using rsync-based incremental transfers, peer-to-peer neighbor synchronization, and intelligent conflict detection. This document details the sync mechanisms, protocols, and strategies employed by cpm.
Synchronization Model
Client-Server Synchronization
Primary model for repository distribution:
┌──────────────┐
│ Client │
│ (cpm) │
└───────┬──────┘
│
┌─────────────┴─────────────┐
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Main Server │ │Backup Server │
│ │ │ │
│ - Repository │ │ - Repository │
│ store │ │ store │
│ - git daemon │ │ - git daemon │
└──────────────┘ └──────────────┘
Peer-to-Peer Synchronization
Neighbor-based distribution:
┌──────────────┐ ┌──────────────┐
│ Neighbor A │◄────►│ Neighbor B │
│ (cpm) │ │ (cpm) │
└──────┬───────┘ └──────┬───────┘
│ │
│ ┌──────────────┐ │
└─►│ Neighbor C │◄──┘
│ (cpm) │
└──────────────┘
Sync Manager Architecture
Location: internal/server/sync.go
Core Components
type SyncManager struct {
serverManager *ServerManager
repoBasePath string
}
type SyncResult struct {
RepoName string
TargetServer string
Success bool
Error error
Duration time.Duration
BytesSync int64
}
Sync Operations
Push (Client to Server)
┌─────────────────────────────────────────────┐
│ Push Synchronization Flow │
│ │
│ 1. Validate repository exists locally │
│ 2. Retrieve target server configuration │
│ 3. Establish SSH connection │
│ 4. Execute rsync transfer │
│ - Compare checksums │
│ - Transfer differences │
│ - Delete removed files │
│ 5. Update database sync timestamp │
│ 6. Return sync result │
└─────────────────────────────────────────────┘
Implementation:
func (sm *SyncManager) SyncRepoTo(repoName, targetServer string) (*SyncResult, error) {
startTime := time.Now()
// Get server configuration
server, err := sm.serverManager.Get(targetServer)
if err != nil {
return nil, err
}
// Build paths
localPath := fmt.Sprintf("%s/%s", sm.repoBasePath, repoName)
remotePath := fmt.Sprintf("%s@%s:%s/%s",
server.User, server.Host, sm.repoBasePath, repoName)
// Execute rsync
err = sm.executeRsync(localPath, remotePath, server)
return &SyncResult{
RepoName: repoName,
TargetServer: targetServer,
Success: err == nil,
Error: err,
Duration: time.Since(startTime),
}, err
}
Pull (Server to Client)
┌─────────────────────────────────────────────┐
│ Pull Synchronization Flow │
│ │
│ 1. Retrieve source server configuration │
│ 2. Check repository exists on server │
│ 3. Establish SSH connection │
│ 4. Execute rsync transfer │
│ - Receive file differences │
│ - Update local repository │
│ - Preserve git structure │
│ 5. Register/update in database │
│ 6. Return sync result │
└─────────────────────────────────────────────┘
rsync Protocol
Command Structure
rsync -avz --delete -e "ssh [options]" <source> <destination>
Options Breakdown
| Option | Purpose | Benefit |
|---|---|---|
-a |
Archive mode | Preserves permissions, timestamps, symlinks |
-v |
Verbose | Detailed transfer output |
-z |
Compression | Reduces bandwidth usage |
--delete |
Mirror deletion | Removes files not in source |
-e ssh |
SSH transport | Secure, encrypted transfer |
--progress |
Progress display | Real-time transfer status |
Incremental Transfer
rsync uses rolling checksums for efficiency:
1. List files in source and destination
2. For each file:
a. Calculate checksum
b. Compare with destination
c. If different:
- Break file into blocks
- Transfer only changed blocks
d. If same:
- Skip transfer
3. Delete files not in source (if --delete)
Benefits:
- Only transfers differences
- Reduces bandwidth usage by 80-95% on updates
- Fast for large repositories with small changes
- Reliable with built-in error checking
Transfer Performance
func (sm *SyncManager) executeRsync(source, destination string, server *Server) error {
args := []string{
"-avz", // Archive, verbose, compress
"--delete", // Mirror deletions
"--stats", // Show statistics
"--timeout=300", // 5 minute timeout
"-e", // SSH command follows
}
// Build SSH command
sshCmd := fmt.Sprintf("ssh -o StrictHostKeyChecking=no")
if server.SSHKeyPath != "" {
sshCmd += fmt.Sprintf(" -i %s", server.SSHKeyPath)
}
if server.Port != 0 && server.Port != 22 {
sshCmd += fmt.Sprintf(" -p %d", server.Port)
}
args = append(args, sshCmd, source, destination)
cmd := exec.Command("rsync", args...)
return cmd.Run()
}
Conflict Detection
Pre-Sync Validation
Before synchronizing, cpm checks for potential conflicts:
func (sm *SyncManager) detectConflicts(repoName string, direction string) ([]Conflict, error) {
var conflicts []Conflict
// Check for uncommitted changes
if hasUncommittedChanges(repoName) {
conflicts = append(conflicts, Conflict{
Type: "uncommitted_changes",
Description: "Repository has uncommitted changes",
Resolution: "Commit or stash changes before sync",
})
}
// Check for diverged branches
if hasDivergedBranches(repoName) {
conflicts = append(conflicts, Conflict{
Type: "diverged_branches",
Description: "Local and remote have diverged",
Resolution: "Merge or rebase before sync",
})
}
return conflicts, nil
}
Conflict Resolution Strategies
| Conflict Type | Strategy | Command Flag |
|---|---|---|
| Uncommitted changes | Abort or force | --force |
| Diverged history | Manual merge | N/A (manual) |
| File permissions | Preserve local | Default |
| Deletion conflicts | Mirror source | --delete |
Neighbor Synchronization
Discovery Protocol
┌────────────────────────────────────────────┐
│ Neighbor Discovery Process │
│ │
│ 1. Detect local network (CIDR) │
│ 2. Generate host list from CIDR │
│ 3. Concurrent TCP connection attempts │
│ - Port 9418 (cpm default) │
│ - 2 second timeout per host │
│ - 50 concurrent connections │
│ 4. Filter reachable hosts │
│ 5. Identify cpm servers │
│ 6. Optional: auto-register as neighbors │
└────────────────────────────────────────────┘
Implementation: internal/server/neighbor.go
func (nm *NeighborManager) Discover(network string) ([]DiscoveryResult, error) {
// Parse CIDR
ip, ipNet, err := net.ParseCIDR(network)
if err != nil {
return nil, err
}
// Generate hosts
hosts := nm.generateHosts(ip, ipNet)
// Scan concurrently
results := make([]DiscoveryResult, 0)
resultsChan := make(chan DiscoveryResult, len(hosts))
var wg sync.WaitGroup
semaphore := make(chan struct{}, 50) // Limit concurrency
for _, host := range hosts {
wg.Add(1)
go func(h string) {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
result := nm.scanHost(h, DefaultGitmPort)
if result.Reachable {
resultsChan <- result
}
}(host)
}
// Collect results
go func() {
wg.Wait()
close(resultsChan)
}()
for result := range resultsChan {
results = append(results, result)
}
return results, nil
}
Neighbor Sync Protocol
Direct peer-to-peer synchronization:
Neighbor A Neighbor B
│ │
│ 1. Ping (connectivity test) │
│────────────────────────────►│
│◄────────────────────────────│
│ │
│ 2. Sync request (repo name) │
│────────────────────────────►│
│ │
│ 3. rsync transfer │
│◄───────────────────────────►│
│ │
│ 4. Verify completion │
│◄────────────────────────────│
│ │
Sync Strategies
Full Sync
Complete repository transfer (initial sync):
rsync -avz --delete \
-e "ssh -i key" \
/local/repo.git/ \
user@host:/remote/repo.git/
Incremental Sync
Transfer only changes since last sync:
# rsync automatically handles this
rsync -avz --update \
-e "ssh -i key" \
/local/repo.git/ \
user@host:/remote/repo.git/
Selective Sync
Sync specific branches or refs:
rsync -avz \
--include='refs/heads/main' \
--include='refs/heads/develop' \
--exclude='refs/**' \
-e "ssh -i key" \
/local/repo.git/ \
user@host:/remote/repo.git/
Progress Tracking
Real-time Progress
func (sm *SyncManager) SyncWithProgress(
repoName, targetServer string,
progressFn func(line string),
) (*SyncResult, error) {
cmd := exec.Command("rsync", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
// Read progress
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
if progressFn != nil {
progressFn(scanner.Text())
}
}
return result, cmd.Wait()
}
Progress Display
Syncing repository 'webapp' to main server...
0% [ ] 0 B/s ETA: --:--
15% [### ] 12.5 MB/s ETA: 00:12
47% [######### ] 15.2 MB/s ETA: 00:06
73% [############## ] 16.8 MB/s ETA: 00:03
100% [####################] 15.9 MB/s
Files transferred: 892
Total size: 156.8 MB
Transfer time: 18.4s
Average speed: 8.5 MB/s
Sync Verification
Post-Sync Validation
func (sm *SyncManager) VerifySync(repoName, targetServer string) (bool, error) {
// Use rsync dry-run to check differences
args := []string{
"-avz",
"--dry-run",
"--delete",
localPath,
remotePath,
}
cmd := exec.Command("rsync", args...)
output, err := cmd.CombinedOutput()
if err != nil {
return false, err
}
// Parse output for differences
lines := strings.Split(string(output), "\n")
fileChanges := 0
for _, line := range lines {
if isFileLine(line) {
fileChanges++
}
}
return fileChanges == 0, nil
}
Performance Optimization
Bandwidth Optimization
- Compression:
-zflag for on-the-fly compression - Incremental: Only transfer differences
- Block-level: Transfer changed blocks within files
- Parallel: Multiple concurrent syncs for independent repos
Network Optimization
# Limit bandwidth
rsync --bwlimit=10000 ... # 10 MB/s limit
# Adjust compression level
rsync -z --compress-level=6 ... # Balance speed/compression
# Increase buffer size
rsync --buffer-size=8192 ... # Larger network buffer
Error Recovery
func syncWithRetry(sm *SyncManager, repo, server string, maxRetries int) error {
var lastErr error
for i := 0; i < maxRetries; i++ {
result, err := sm.SyncRepoTo(repo, server)
if err == nil {
return nil
}
lastErr = err
log.Printf("Sync attempt %d failed: %v", i+1, err)
// Exponential backoff
time.Sleep(time.Duration(1<<i) * time.Second)
}
return fmt.Errorf("sync failed after %d retries: %w", maxRetries, lastErr)
}
Best Practices
Sync Frequency
- Critical repos: Every 15-30 minutes
- Active repos: Every 1-2 hours
- Stable repos: Daily
- Archive repos: Weekly
Sync Order
- Push to main server first
- Push to backup servers second
- Sync between neighbors last
- Verify each sync before proceeding
Error Handling
- Always validate repositories before sync
- Check disk space on target
- Monitor transfer times
- Log all sync operations
- Implement retry with backoff
- Alert on persistent failures
Security
- Use SSH keys exclusively
- Verify host keys
- Limit sync permissions
- Audit sync operations
- Encrypt sensitive repositories
Monitoring and Logging
Sync Metrics
type SyncMetrics struct {
TotalSyncs int64
SuccessfulSyncs int64
FailedSyncs int64
TotalBytes int64
AverageSpeed float64
LastSyncTime time.Time
}
Log Format
2024-01-15 10:30:00 [INFO] Sync started: repo=webapp, target=main-server
2024-01-15 10:30:18 [INFO] Sync completed: repo=webapp, duration=18.4s, size=156.8MB, speed=8.5MB/s
2024-01-15 10:30:18 [INFO] Files transferred: 892, bytes: 164,428,800