#!perl

use strict;
use warnings;
use threads;
use threads::shared;
use LWP::UserAgent;
use IO::Socket::INET;
use IO::Select;
use Digest::SHA qw(sha1);
use URI::Escape;
use File::Path qw(make_path);
use File::Basename;
use POSIX qw(ceil);

# Constants
use constant {
    BLOCK_SIZE      => 16384,    # 16 KB standard block
    PEER_TIMEOUT    => 30,       # seconds
    MAX_BACKLOG     => 5,        # pipelined requests per peer
    CONNECT_TIMEOUT => 10,       # seconds for TCP connect
    MAX_PEERS       => 30,       # max simultaneous peer connections
};

# Message IDs
use constant {
    MSG_CHOKE          => 0,
    MSG_UNCHOKE        => 1,
    MSG_INTERESTED     => 2,
    MSG_NOT_INTERESTED => 3,
    MSG_HAVE           => 4,
    MSG_BITFIELD       => 5,
    MSG_REQUEST        => 6,
    MSG_PIECE          => 7,
    MSG_CANCEL         => 8,
};

# ─── Bdecode ───────────────────────────────────────────────────────────────────

sub bdecode {
    my ($data) = @_;
    my $pos = 0;
    return _bdecode($data, \$pos);
}

sub _bdecode {
    my ($data, $pos_ref) = @_;
    my $len = length($data);
    die "Unexpected end of data at pos $$pos_ref" if $$pos_ref >= $len;

    my $char = substr($data, $$pos_ref, 1);

    if ($char eq 'i') {
        $$pos_ref++;
        my $end = index($data, 'e', $$pos_ref);
        die "Unterminated integer" if $end < 0;
        my $num = substr($data, $$pos_ref, $end - $$pos_ref);
        $$pos_ref = $end + 1;
        return 0 + $num;
    } elsif ($char eq 'l') {
        $$pos_ref++;
        my @list;
        while ($$pos_ref < $len && substr($data, $$pos_ref, 1) ne 'e') {
            push @list, _bdecode($data, $pos_ref);
        }
        $$pos_ref++;
        return \@list;
    } elsif ($char eq 'd') {
        $$pos_ref++;
        my %dict;
        while ($$pos_ref < $len && substr($data, $$pos_ref, 1) ne 'e') {
            my $key = _bdecode($data, $pos_ref);
            my $val = _bdecode($data, $pos_ref);
            $dict{$key} = $val;
        }
        $$pos_ref++;
        return \%dict;
    } elsif ($char ge '0' && $char le '9') {
        my $colon = index($data, ':', $$pos_ref);
        die "Expected ':' in string at pos $$pos_ref" if $colon < 0;
        my $slen = substr($data, $$pos_ref, $colon - $$pos_ref);
        $$pos_ref = $colon + 1;
        my $str = substr($data, $$pos_ref, $slen);
        $$pos_ref += $slen;
        return $str;
    } else {
        die "Invalid bencode character '$char' at pos $$pos_ref";
    }
}

# Extract the raw bytes of the 'info' value from the torrent file
# so the info_hash is computed from the original bytes, not re-encoded data.
sub extract_raw_info {
    my ($data) = @_;
    my $pos = 1;    # skip top-level 'd'
    my $len = length($data);

    while ($pos < $len && substr($data, $pos, 1) ne 'e') {
        my $key = _bdecode($data, \$pos);
        my $val_start = $pos;
        _bdecode($data, \$pos);
        if ($key eq 'info') {
            return substr($data, $val_start, $pos - $val_start);
        }
    }
    die "Could not find 'info' key in torrent file";
}

# ─── Bencode ───────────────────────────────────────────────────────────────────

sub bencode {
    my ($data) = @_;
    if (ref $data eq 'HASH') {
        my $str = 'd';
        for my $key (sort keys %$data) {
            $str .= bencode($key) . bencode($data->{$key});
        }
        return $str . 'e';
    } elsif (ref $data eq 'ARRAY') {
        my $str = 'l';
        $str .= bencode($_) for @$data;
        return $str . 'e';
    } else {
        return length($data) . ':' . $data;
    }
}

# ─── Peer ID ──────────────────────────────────────────────────────────────────

sub generate_peer_id {
    return '-PL0001-' . join('', map { chr(int(rand(256))) } 1 .. 12);
}

# ─── Bitfield Helpers (MSB-first per BitTorrent spec) ─────────────────────────
# BitTorrent protocol: high bit of first byte = piece 0
# Perl's vec() uses LSB-first, so we use manual bit ops instead.

sub bf_has {
    my ($bf, $idx) = @_;
    my $byte_idx = int($idx / 8);
    my $bit = 7 - ($idx % 8);
    return 0 if $byte_idx >= length($bf);
    return (ord(substr($bf, $byte_idx, 1)) >> $bit) & 1;
}

sub bf_set {
    my ($bf_ref, $idx) = @_;
    my $byte_idx = int($idx / 8);
    my $bit = 7 - ($idx % 8);
    return if $byte_idx >= length($$bf_ref);
    substr($$bf_ref, $byte_idx, 1) =
        chr(ord(substr($$bf_ref, $byte_idx, 1)) | (1 << $bit));
}

# ─── Compact Peers Parsing ────────────────────────────────────────────────────

sub parse_compact_peers {
    my ($data) = @_;
    my @peers;
    return \@peers unless defined $data;

    if (ref $data eq 'ARRAY') {
        for my $p (@$data) {
            push @peers, { ip => $p->{ip}, port => $p->{port} };
        }
    } else {
        for (my $i = 0; $i + 6 <= length($data); $i += 6) {
            my @ip   = unpack('C4', substr($data, $i, 4));
            my $port = unpack('n',  substr($data, $i + 4, 2));
            push @peers, { ip => join('.', @ip), port => $port };
        }
    }
    return \@peers;
}

# ─── Tracker Communication ────────────────────────────────────────────────────

sub contact_tracker {
    my ($announce, $info_hash, $peer_id, $left, $event) = @_;

    my $sep = ($announce =~ /\?/) ? '&' : '?';
    my $url = $announce . $sep
        . 'info_hash='  . uri_escape($info_hash, "^A-Za-z0-9\\-_.!~*'()")
        . '&peer_id='   . uri_escape($peer_id,   "^A-Za-z0-9\\-_.!~*'()")
        . '&port=6881'
        . '&uploaded=0'
        . '&downloaded=0'
        . '&left=' . $left
        . '&compact=1';
    $url .= "&event=$event" if $event;

    print "  Contacting: $announce\n";

    my $ua   = LWP::UserAgent->new(timeout => 15);
    my $resp = $ua->get($url);
    unless ($resp->is_success) {
        warn "  Tracker request failed: " . $resp->status_line . "\n";
        return undef;
    }

    my $decoded = eval { bdecode($resp->content) };
    if ($@) {
        warn "  Failed to decode tracker response: $@\n";
        return undef;
    }
    if ($decoded->{'failure reason'}) {
        warn "  Tracker failure: " . $decoded->{'failure reason'} . "\n";
        return undef;
    }
    return $decoded;
}

# ─── Socket I/O ───────────────────────────────────────────────────────────────

sub read_exactly {
    my ($socket, $length) = @_;
    my $sel  = IO::Select->new($socket);
    my $data = '';
    my $remaining = $length;

    while ($remaining > 0) {
        return undef unless $sel->can_read(PEER_TIMEOUT);
        my $buf;
        my $n = sysread($socket, $buf, $remaining);
        return undef unless defined $n && $n > 0;
        $data .= $buf;
        $remaining -= $n;
    }
    return $data;
}

# ─── Peer Wire Protocol ──────────────────────────────────────────────────────

sub do_handshake {
    my ($socket, $info_hash, $peer_id) = @_;

    my $pstr      = 'BitTorrent protocol';
    my $handshake = chr(length($pstr)) . $pstr . ("\0" x 8) . $info_hash . $peer_id;

    my $n = syswrite($socket, $handshake);
    return undef unless defined $n && $n == length($handshake);

    my $response = read_exactly($socket, 68);
    return undef unless defined $response && length($response) == 68;

    my $resp_pstr = substr($response, 1, 19);
    return undef unless $resp_pstr eq 'BitTorrent protocol';

    my $resp_info_hash = substr($response, 28, 20);
    unless ($resp_info_hash eq $info_hash) {
        warn "  Info hash mismatch in handshake\n";
        return undef;
    }
    return substr($response, 48, 20);    # peer's peer_id
}

sub send_msg {
    my ($socket, $id, $payload) = @_;
    $payload //= '';
    my $msg     = pack('N', 1 + length($payload)) . chr($id) . $payload;
    my $written = 0;
    while ($written < length($msg)) {
        my $n = syswrite($socket, $msg, length($msg) - $written, $written);
        return 0 unless defined $n;
        $written += $n;
    }
    return 1;
}

sub read_msg {
    my ($socket) = @_;

    my $len_buf = read_exactly($socket, 4);
    return undef unless defined $len_buf;

    my $len = unpack('N', $len_buf);
    return [ undef, '' ] if $len == 0;    # keep-alive

    # Sanity check: reject absurdly large messages
    return undef if $len > 2 * BLOCK_SIZE + 64;

    my $msg = read_exactly($socket, $len);
    return undef unless defined $msg && length($msg) == $len;

    return [ ord(substr($msg, 0, 1)), substr($msg, 1) ];
}

# ─── Piece Helpers ────────────────────────────────────────────────────────────

sub get_piece_length {
    my ($index, $nominal, $total) = @_;
    my $start     = $index * $nominal;
    my $remaining = $total - $start;
    return $remaining < $nominal ? $remaining : $nominal;
}

sub get_piece_hashes {
    my ($pieces_str) = @_;
    my @h;
    for (my $i = 0; $i < length($pieces_str); $i += 20) {
        push @h, substr($pieces_str, $i, 20);
    }
    return \@h;
}

# ─── Download a single piece with pipelined requests ─────────────────────────

sub download_piece {
    my ($socket, $piece_idx, $piece_length, $total_size,
        $peer_bf_ref, $peer_choking_ref) = @_;

    my $plen    = get_piece_length($piece_idx, $piece_length, $total_size);
    my $nblocks = ceil($plen / BLOCK_SIZE);
    my $requested = 0;
    my @block_done = (0) x $nblocks;
    my $received   = 0;
    my $piece_data = "\0" x $plen;

    while ($received < $nblocks) {
        # Send pipelined requests up to MAX_BACKLOG
        while (!$$peer_choking_ref
               && $requested < $nblocks
               && ($requested - $received) < MAX_BACKLOG) {
            my $offset = $requested * BLOCK_SIZE;
            my $blen = ($offset + BLOCK_SIZE > $plen)
                ? ($plen - $offset) : BLOCK_SIZE;
            send_msg($socket, MSG_REQUEST,
                     pack('NNN', $piece_idx, $offset, $blen));
            $requested++;
        }

        # Receive a message
        my $msg = read_msg($socket);
        return (0, undef) unless defined $msg;

        my ($id, $payload) = @$msg;
        next unless defined $id;    # keep-alive

        if ($id == MSG_CHOKE) {
            $$peer_choking_ref = 1;
            return (0, undef);

        } elsif ($id == MSG_UNCHOKE) {
            $$peer_choking_ref = 0;

        } elsif ($id == MSG_HAVE) {
            if (length($payload) >= 4) {
                my $idx = unpack('N', $payload);
                bf_set($peer_bf_ref, $idx);
            }

        } elsif ($id == MSG_BITFIELD) {
            $$peer_bf_ref = $payload;

        } elsif ($id == MSG_PIECE) {
            next if length($payload) < 8;
            my $idx   = unpack('N', substr($payload, 0, 4));
            my $begin = unpack('N', substr($payload, 4, 4));
            my $block = substr($payload, 8);

            if ($idx == $piece_idx && $begin + length($block) <= $plen) {
                my $block_idx = int($begin / BLOCK_SIZE);
                # Skip duplicate blocks
                next if $block_idx < $nblocks && $block_done[$block_idx];
                substr($piece_data, $begin, length($block)) = $block;
                if ($block_idx < $nblocks) {
                    $block_done[$block_idx] = 1;
                    $received++;
                }
            }
        }
    }

    return (1, $piece_data);
}

# ─── Write piece data to files (handles multi-file spanning) ─────────────────
# Uses sysseek/syswrite for thread safety (no buffering, position-independent).

sub write_piece_to_files {
    my ($piece_idx, $piece_data, $piece_length, $files, $file_handles) = @_;

    my $global_offset = $piece_idx * $piece_length;
    my $data_len      = length($piece_data);
    my $written       = 0;

    for my $i (0 .. $#$files) {
        last if $written >= $data_len;

        my $f      = $files->[$i];
        my $fstart = $f->{offset};
        my $fend   = $fstart + $f->{length};

        next if $global_offset + $written >= $fend;
        last if $global_offset + $data_len <= $fstart;

        my $write_start = ($global_offset + $written > $fstart)
            ? ($global_offset + $written) : $fstart;
        my $write_end = ($global_offset + $data_len < $fend)
            ? ($global_offset + $data_len) : $fend;
        my $file_off = $write_start - $fstart;
        my $data_off = $write_start - $global_offset;
        my $wlen     = $write_end - $write_start;

        my $fh = $file_handles->[$i];
        sysseek($fh, $file_off, 0);
        syswrite($fh, $piece_data, $wlen, $data_off);

        $written = $write_end - $global_offset;
    }
}

# ─── Formatting ───────────────────────────────────────────────────────────────

sub format_size {
    my ($b) = @_;
    return sprintf("%.2f GB", $b / 1073741824) if $b >= 1073741824;
    return sprintf("%.2f MB", $b / 1048576)    if $b >= 1048576;
    return sprintf("%.2f KB", $b / 1024)       if $b >= 1024;
    return "$b B";
}

# ─── Thread-Safe Shared State ────────────────────────────────────────────────

my @g_have    :shared;    # 1 if piece is verified and written
my @g_pending :shared;    # 1 if piece is being downloaded by a thread
my $g_done    :shared = 0;
my $g_dl      :shared = 0;    # bytes downloaded
my $g_lock    :shared;        # mutex for piece selection

# ─── Peer Worker (runs in its own thread) ────────────────────────────────────

sub peer_worker {
    my ($ip, $port, $info_hash, $peer_id, $piece_length, $total_size,
        $piece_hashes, $num_pieces, $files, $output_dir, $start_time) = @_;

    my $tag = "$ip:$port";

    # Early exit if download is already complete
    { lock($g_lock); return if $g_done >= $num_pieces; }

    # Each thread opens its own file handles for writing
    my @fhs;
    for my $file (@$files) {
        my $path = "$output_dir/$file->{path}";
        open my $fh, '+<:raw', $path or do {
            warn "  [$tag] Cannot open $path: $!\n";
            return;
        };
        push @fhs, $fh;
    }

    # Connect
    my $socket = IO::Socket::INET->new(
        PeerAddr => $ip,
        PeerPort => $port,
        Proto    => 'tcp',
        Timeout  => CONNECT_TIMEOUT,
    );
    unless ($socket) {
        print "  [$tag] connect failed\n";
        close $_ for @fhs;
        return;
    }
    binmode($socket);

    # Handshake
    my $rpid = do_handshake($socket, $info_hash, $peer_id);
    unless ($rpid) {
        print "  [$tag] handshake failed\n";
        close $socket;
        close $_ for @fhs;
        return;
    }
    print "  [$tag] connected\n";

    # Per-peer state
    my $peer_choking = 1;
    my $peer_bf      = "\0" x ceil($num_pieces / 8);

    # Send interested
    send_msg($socket, MSG_INTERESTED, '');

    # Wait for unchoke (up to 50 messages)
    eval {
        for (1 .. 50) {
            last unless $peer_choking;
            my $msg = read_msg($socket);
            last unless defined $msg;
            my ($id, $payload) = @$msg;
            next unless defined $id;

            if ($id == MSG_UNCHOKE) {
                $peer_choking = 0;
            } elsif ($id == MSG_BITFIELD) {
                $peer_bf = $payload;
            } elsif ($id == MSG_HAVE) {
                if (length($payload) >= 4) {
                    my $idx = unpack('N', $payload);
                    bf_set(\$peer_bf, $idx);
                }
            }
        }
    };

    if ($peer_choking) {
        print "  [$tag] never unchoked\n";
        close $socket;
        close $_ for @fhs;
        return;
    }

    # Download loop
    eval {
        while (1) {
            # Check if all pieces are done
            { lock($g_lock); last if $g_done >= $num_pieces; }

            # Thread-safe piece selection
            my $pidx;
            {
                lock($g_lock);
                for my $i (0 .. $num_pieces - 1) {
                    next if $g_have[$i] || $g_pending[$i];
                    if (bf_has($peer_bf, $i)) {
                        $g_pending[$i] = 1;
                        $pidx = $i;
                        last;
                    }
                }
            }
            unless (defined $pidx) {
                last;    # nothing available from this peer
            }

            my ($ok, $piece_data) = download_piece(
                $socket, $pidx, $piece_length, $total_size,
                \$peer_bf, \$peer_choking
            );

            unless ($ok) {
                { lock($g_lock); $g_pending[$pidx] = 0; }
                if ($peer_choking) {
                    print "  [$tag] choked during piece $pidx, waiting...\n";
                    for (1 .. 30) {
                        last unless $peer_choking;
                        my $msg = read_msg($socket);
                        last unless defined $msg;
                        my ($id, $payload) = @$msg;
                        next unless defined $id;
                        if ($id == MSG_UNCHOKE) {
                            $peer_choking = 0;
                        } elsif ($id == MSG_HAVE
                                 && length($payload) >= 4) {
                            my $idx = unpack('N', $payload);
                            bf_set(\$peer_bf, $idx);
                        }
                    }
                    next unless $peer_choking;
                }
                last;
            }

            # Verify SHA-1 hash
            my $plen  = get_piece_length($pidx, $piece_length, $total_size);
            my $pdata = substr($piece_data, 0, $plen);
            my $hash  = sha1($pdata);

            if ($hash eq $piece_hashes->[$pidx]) {
                # Write piece to disk (each thread has its own file handles)
                write_piece_to_files($pidx, $pdata, $piece_length,
                                     $files, \@fhs);

                my ($cur_done, $cur_dl);
                {
                    lock($g_lock);
                    $g_have[$pidx]    = 1;
                    $g_pending[$pidx] = 0;
                    $g_done++;
                    $g_dl += $plen;
                    $cur_done = $g_done;
                    $cur_dl   = $g_dl;
                }

                my $elapsed = (time() - $start_time) || 1;
                my $speed   = $cur_dl / $elapsed;
                printf "  [%s] Piece %d/%d OK  [%.1f%%  %s/s]\n",
                    $tag, $cur_done, $num_pieces,
                    ($cur_done / $num_pieces) * 100,
                    format_size($speed);

                send_msg($socket, MSG_HAVE, pack('N', $pidx));
            } else {
                warn sprintf(
                    "  [%s] Piece %d HASH MISMATCH (got %s, expected %s)\n",
                    $tag, $pidx, unpack('H*', $hash),
                    unpack('H*', $piece_hashes->[$pidx])
                );
                { lock($g_lock); $g_pending[$pidx] = 0; }
            }
        }
    };
    warn "  [$tag] Error: $@\n" if $@;
    close $socket;
    close $_ for @fhs;

    my $d;
    { lock($g_lock); $d = $g_done; }
    print "  [$tag] Disconnected ($d/$num_pieces pieces done)\n";
}

# ─── Main ─────────────────────────────────────────────────────────────────────

sub main {
    my $torrent_file = $ARGV[0] or die "Usage: $0 <torrent_file> [output_dir]\n";
    my $output_dir   = $ARGV[1] // '.';

    # Read torrent file (binary mode)
    open my $fh, '<:raw', $torrent_file or die "Cannot open $torrent_file: $!\n";
    my $torrent_data = do { local $/; <$fh> };
    close $fh;

    my $torrent = bdecode($torrent_data);
    my $info    = $torrent->{info};

    # Compute info_hash from the original raw bytes
    my $raw_info  = extract_raw_info($torrent_data);
    my $info_hash = sha1($raw_info);
    my $peer_id   = generate_peer_id();

    my $piece_length = $info->{'piece length'};
    my $piece_hashes = get_piece_hashes($info->{pieces});
    my $num_pieces   = scalar @$piece_hashes;

    # Determine files and total size, with cumulative offsets
    my (@files, $total_size);
    my $base_name  = $info->{name};
    my $cum_offset = 0;

    if (exists $info->{files}) {
        # Multi-file torrent
        for my $f (@{ $info->{files} }) {
            my $path = join('/', @{ $f->{path} });
            push @files, {
                path   => "$base_name/$path",
                length => $f->{length},
                offset => $cum_offset,
            };
            $cum_offset += $f->{length};
        }
        $total_size = $cum_offset;
    } else {
        $total_size = $info->{length};
        push @files, { path => $base_name, length => $total_size, offset => 0 };
    }

    print "=" x 60, "\n";
    print "Torrent : $base_name\n";
    print "Size    : ", format_size($total_size), "\n";
    print "Pieces  : $num_pieces x ", format_size($piece_length), "\n";
    print "Files   : ", scalar(@files), "\n";
    print "Info hash: ", unpack('H*', $info_hash), "\n";
    print "=" x 60, "\n\n";

    # Pre-allocate output files
    for my $file (@files) {
        my $path = "$output_dir/$file->{path}";
        my $dir  = dirname($path);
        make_path($dir) unless -d $dir;

        open my $out, '+>:raw', $path or die "Cannot create $path: $!\n";
        if ($file->{length} > 0) {
            sysseek($out, $file->{length} - 1, 0);
            syswrite($out, "\0", 1);
        }
        close $out;
        print "Created: $path (", format_size($file->{length}), ")\n";
    }
    print "\n";

    # Initialize shared piece tracking
    @g_have    = (0) x $num_pieces;
    @g_pending = (0) x $num_pieces;
    $g_done    = 0;
    $g_dl      = 0;

    # Gather announce URLs (HTTP only; UDP trackers not supported)
    my @announce_urls;
    push @announce_urls, $torrent->{announce} if $torrent->{announce};
    if ($torrent->{'announce-list'}) {
        for my $tier (@{ $torrent->{'announce-list'} }) {
            for my $url (@$tier) {
                push @announce_urls, $url unless grep { $_ eq $url } @announce_urls;
            }
        }
    }
    @announce_urls = grep { /^https?:/ } @announce_urls;
    die "No HTTP tracker URLs found in torrent\n" unless @announce_urls;

    # Contact trackers
    print "Contacting trackers...\n";
    my @all_peers;
    for my $announce (@announce_urls) {
        my $resp = contact_tracker($announce, $info_hash, $peer_id,
                                   $total_size, 'started');
        if ($resp && $resp->{peers}) {
            my $peers = parse_compact_peers($resp->{peers});
            push @all_peers, @$peers;
            print "  Got " . scalar(@$peers) . " peers\n";
            last if @all_peers >= 50;
        }
    }
    die "No peers found from any tracker!\n" unless @all_peers;

    # De-duplicate peers
    my %seen;
    @all_peers = grep { !$seen{"$_->{ip}:$_->{port}"}++ } @all_peers;
    print "Unique peers: " . scalar(@all_peers) . "\n\n";

    my $start_time  = time();
    my $max_retries = 3;

    for my $retry (0 .. $max_retries) {
        { lock($g_lock); last if $g_done >= $num_pieces; }

        if ($retry > 0) {
            # Release any pending pieces from dead threads
            {
                lock($g_lock);
                for my $i (0 .. $num_pieces - 1) {
                    $g_pending[$i] = 0 unless $g_have[$i];
                }
            }

            print "Retry round $retry — re-announcing...\n";
            for my $announce (@announce_urls) {
                my $left;
                { lock($g_lock); $left = $total_size - $g_dl; }
                my $resp = contact_tracker($announce, $info_hash, $peer_id,
                                           $left, '');
                if ($resp && $resp->{peers}) {
                    my $peers = parse_compact_peers($resp->{peers});
                    for my $p (@$peers) {
                        my $key = "$p->{ip}:$p->{port}";
                        unless ($seen{$key}) {
                            push @all_peers, $p;
                            $seen{$key} = 1;
                        }
                    }
                }
            }
        }

        # Launch concurrent peer workers (up to MAX_PEERS at a time)
        my $npeers = scalar(@all_peers);
        my $batch_start = 0;

        while ($batch_start < $npeers) {
            { lock($g_lock); last if $g_done >= $num_pieces; }

            my $batch_end = $batch_start + MAX_PEERS - 1;
            $batch_end = $npeers - 1 if $batch_end >= $npeers;

            my @threads;
            for my $pi ($batch_start .. $batch_end) {
                { lock($g_lock); last if $g_done >= $num_pieces; }
                my $peer = $all_peers[$pi];
                push @threads, threads->create(
                    \&peer_worker,
                    $peer->{ip}, $peer->{port},
                    $info_hash, $peer_id,
                    $piece_length, $total_size,
                    $piece_hashes, $num_pieces,
                    \@files, $output_dir, $start_time
                );
            }

            my $launched = scalar @threads;
            if ($launched > 0) {
                print "Launched $launched peer connections "
                    . "(batch " . ($batch_start + 1) . "-"
                    . ($batch_start + $launched)
                    . " of $npeers)\n";
            }

            # Wait for all threads in this batch to finish
            for my $thr (@threads) {
                $thr->join();
            }

            $batch_start = $batch_end + 1;

            { lock($g_lock); last if $g_done >= $num_pieces; }
        }
    }

    my $final_done;
    { lock($g_lock); $final_done = $g_done; }

    if ($final_done < $num_pieces) {
        die "Download incomplete: $final_done/$num_pieces pieces. "
          . "Need more peers or retry.\n";
    }

    my $elapsed = (time() - $start_time) || 1;
    printf "\nDone! %s in %d seconds (%s/s avg)\n",
        format_size($total_size), $elapsed, format_size($total_size / $elapsed);
    printf "All %d pieces verified.\n", $num_pieces;
}

main();
