Changeset 2760

Show
Ignore:
Timestamp:
03/04/06 03:46:21 (3 years ago)
Author:
madcat
Message:

TCP server source requests are now split to 15-file-blocks, and sent at 4-minute intervals (max 75 files per 20 minutes). Rarest files are asked first.
UDP server source requests are now based on the rarity of the file, so that the rarest 25 or 31 files are always asked for. This should balance the average source count of large downloadlist, and ensure that all files have at least some sources.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • hydranode/hncore/ed2k/downloadlist.cpp

    r2613 r2760  
    4545// -------------- 
    4646Download::Download(PartData *pd, const Hash<ED2KHash> &hash) : m_partData(pd), 
    47 m_hash(hash), m_lastSrcExch(), m_sourceLimit(), m_lastUdpQuery()
     47m_hash(hash), m_lastSrcExch(), m_sourceLimit()
    4848        pd->getSourceCnt.connect(boost::bind(&Download::getSourceCount, this)); 
    4949        pd->getLinks.connect(boost::bind(&Download::getLink, this, _1, _2)); 
     
    157157                                        &Download::getHash 
    158158                                > 
    159                         >, 
    160                         boost::multi_index::ordered_non_unique< 
    161                                 boost::multi_index::const_mem_fun< 
    162                                         Download, uint32_t, 
    163                                         &Download::getSourceCount 
    164                                 > 
    165                         >, 
    166                         boost::multi_index::ordered_non_unique< 
    167                                 boost::multi_index::const_mem_fun< 
    168                                         Download, uint64_t, 
    169                                         &Download::getLastUdpQuery 
    170                                 > 
    171159                        > 
    172160                > 
    173161        > {}; 
    174         enum { ID_PD, ID_Download, ID_Hash, ID_SrcCnt, ID_UdpQTime }; 
     162        enum { ID_PD, ID_Download, ID_Hash, ID_UdpQTime }; 
    175163        typedef MIDownloadList::nth_index<ID_PD>::type::iterator Iter; 
    176164        typedef MIDownloadList::nth_index<ID_Download>::type::iterator DIter; 
    177165        typedef MIDownloadList::nth_index<ID_Hash>::type::iterator HashIter; 
    178         typedef MIDownloadList::nth_index<ID_SrcCnt>::type::iterator SrcCIter; 
    179         typedef MIDownloadList::nth_index<ID_UdpQTime>::type::iterator QTIter; 
    180166        struct MIDownloadListIterator : public Iter { 
    181167                template<typename T> 
     
    255241        Detail::Iter it = list.find(pd); 
    256242        if (it != list.end() && pd->isRunning()) { 
    257                 // resumed download - force next UDP query 
    258                 list.modify(it, bind(&Download::m_lastUdpQuery, __1) = 0); 
    259243                onAdded(**it); 
    260244                return; 
     
    288272                        m_list->erase(it); 
    289273                } 
    290         } else if (evt == PD_PAUSED) { 
    291                 Detail::Iter it = m_list->find(pd); 
    292                 if (it != m_list->end()) { 
    293                         // disables UDP queries for this file 
    294                         m_list->modify( 
    295                                 it, bind(&Download::m_lastUdpQuery, __1) 
    296                                 = std::numeric_limits<uint32_t>::max() 
    297                         ); 
    298                 } 
    299274        } else if (evt == PD_RESUMED) { 
    300275                tryAddFile(pd); 
     
    308283                } 
    309284        } 
    310 } 
    311  
    312 Download* DownloadList::getNextForUdpQuery() { 
    313         uint64_t curTick = Utils::getTick(); 
    314         Detail::QTIter it = m_list->get<Detail::ID_UdpQTime>().lower_bound(0); 
    315  
    316         if (it != m_list->get<Detail::ID_UdpQTime>().end()) { 
    317                 m_list->get<Detail::ID_UdpQTime>().modify( 
    318                         it, bind(&Download::m_lastUdpQuery, __1) = curTick 
    319                 ); 
    320                 if ((*it)->getPartData()->isRunning()) { 
    321                         return *it; 
    322                 } 
    323         } 
    324         return 0; 
    325285} 
    326286 
  • hydranode/hncore/ed2k/downloadlist.h

    r2613 r2760  
    6060        uint32_t getSourceCount() const { return m_sources.size(); } 
    6161        uint64_t getLastSrcExch() const { return m_lastSrcExch; } 
    62         uint64_t getLastUdpQuery() const { return m_lastUdpQuery; } 
    6362        PartData* getPartData() const { return m_partData; } 
    6463        uint32_t getSourceLimit() const { return m_sourceLimit; } 
     
    124123        uint64_t          m_lastSrcExch;   //!< time of last source-exchange req 
    125124        uint32_t          m_sourceLimit;   //!< limit sources 
    126         uint64_t          m_lastUdpQuery;  //!< last udp query time 
    127125}; 
    128126 
     
    193191        bool valid(Download *ptr) const; 
    194192 
    195         /** 
    196          * Selects next download for sending ServerUDP query to 
    197          * 
    198          * \returns Download to be queried, or 0 if something goes wrong 
    199          */ 
    200         Download* getNextForUdpQuery(); 
    201  
    202193        //! Emitted when a download is removed 
    203194        boost::signal<void (Download&)> onRemoved; 
  • hydranode/hncore/ed2k/serverlist.cpp

    r2752 r2760  
    732732        } 
    733733 
     734        std::vector<Download*> downloads; 
    734735        DownloadList& list = DownloadList::instance(); 
    735         for (DownloadList::Iter it = list.begin(); it != list.end(); ++it) { 
    736                 if ((*it).getPartData()->isRunning()) { 
    737                         reqSources(*it); 
    738                 } 
    739         } 
    740  
     736        // can't use std::copy since Iter isn't fully standard-conformant 
     737        for (DownloadList::Iter i = list.begin(); i != list.end(); ++i) { 
     738                downloads.push_back(&*i); 
     739        } 
     740        std::sort( 
     741                downloads.begin(), downloads.end(), 
     742                boost::bind(&Download::getSourceCount, _1) < 
     743                boost::bind(&Download::getSourceCount, _2) 
     744        ); 
     745        std::string packet; 
     746        uint32_t written = 0, packets = 0; 
     747        for (size_t i = 0; i < downloads.size(); ++i) { 
     748                if (!downloads[i]->getPartData()->isRunning()) { 
     749                        continue; 
     750                } 
     751                packet += ED2KPacket::ReqSources( 
     752                        downloads[i]->getHash(), downloads[i]->getSize() 
     753                ); 
     754                if (++written == 15 && i <= 15) { 
     755                        // first 15 downloads - request now 
     756                        *m_serverSocket << packet; 
     757                        packet.clear(); 
     758                        written = 0; 
     759                        ++packets; 
     760                } else if (written == 15 && packets < 5) { 
     761                        // next 15 downloads, up to a maximum of 4 additional 
     762                        // packets scheduled over the course of next 16 minutes 
     763                        Utils::timedCallback( 
     764                                boost::bind( 
     765                                        &ServerList::reqMoreSources, this,  
     766                                        m_serverSocket, packet 
     767//                                      &ED2KClientSocket::write, m_serverSocket,  
     768//                                      packet 
     769                                ), packets * 245 * 1000 
     770                        ); 
     771                        packet.clear(); 
     772                        written = 0; 
     773                        ++packets; 
     774                } 
     775        } 
     776        if (written && packets == 0) { 
     777                *m_serverSocket << packet; 
     778        } else if (written && packets < 5) { 
     779                Utils::timedCallback( 
     780                        boost::bind( 
     781                                &ServerList::reqMoreSources, this,  
     782                                m_serverSocket, packet 
     783//                                      &ED2KClientSocket::write, m_serverSocket,  
     784//                                      packet 
     785                        ), packets * 245 * 1000 
     786                ); 
     787        } 
     788 
     789        // schedule next loop 
    741790        m_lastSourceRequest = Utils::getTick(); 
    742791        getEventTable().postEvent(this, EVT_REQSOURCES, SOURCEREASKTIME); 
     
    749798                if (m_serverSocket && m_serverSocket->isConnected()) { 
    750799                        reqSources(); 
     800                } else { 
     801                        m_lastSourceRequest = Utils::getTick(); 
     802                        getEventTable().postEvent( 
     803                                this, EVT_REQSOURCES, SOURCEREASKTIME 
     804                        ); 
    751805                } 
    752806        } else if (evt == EVT_PINGSERVER) { 
     
    10011055        } 
    10021056 
     1057        std::vector<Download*> downloads; 
     1058        DownloadList& list = DownloadList::instance(); 
     1059        // can't use std::copy since Iter isn't fully standard-conformant 
     1060        for (DownloadList::Iter i = list.begin(); i != list.end(); ++i) { 
     1061                downloads.push_back(&*i); 
     1062        } 
     1063        std::sort( 
     1064                downloads.begin(), downloads.end(), 
     1065                boost::bind(&Download::getSourceCount, _1) < 
     1066                boost::bind(&Download::getSourceCount, _2) 
     1067        ); 
     1068 
    10031069        uint32_t cnt = 0; 
    10041070        ED2KPacket::GlobGetSources packet(sendSize); 
    1005         while (cnt <= limit) { 
    1006                 Download *d = DownloadList::instance().getNextForUdpQuery(); 
    1007                 if (d) { 
     1071        if (downloads.size() < limit) { 
     1072                limit = downloads.size(); 
     1073        } 
     1074        for (size_t i = 0; i < limit; ++i) { 
     1075                Download *d = downloads[i]; 
     1076                if (d->getPartData()->isRunning()) { 
    10081077                        packet.addHash(d->getHash(), d->getSize()); 
    10091078                        ++cnt; 
    1010                 } else
    1011                         break
     1079                } else if (limit < downloads.size())
     1080                        ++limit
    10121081                } 
    10131082        } 
     
    12961365} 
    12971366 
     1367void ServerList::reqMoreSources(ED2KClientSocket *sock, std::string data) { 
     1368        if (m_serverSocket && sock == m_serverSocket && sock->isConnected()) { 
     1369                logDebug("Sending source request to server."); 
     1370                CHECK(data.size()); 
     1371                *m_serverSocket << data; 
     1372        } else { 
     1373                logDebug( 
     1374                        "Source request callback, but server " 
     1375                        "has changed since then." 
     1376                ); 
     1377        } 
     1378} 
     1379 
    12981380} // end namespace Donkey 
  • hydranode/hncore/ed2k/serverlist.h

    r2738 r2760  
    232232 
    233233        /** 
     234         * Requests sources from current server for next 15 files; this method 
     235         * is called via timed callbacks and shouldn't be called directly. 
     236         * 
     237         * @param sock        Socket to write to; writing is only performed if 
     238         *                    it matches m_serverSocket. 
     239         * @param data        Data to be written to the socket. 
     240         */ 
     241        void reqMoreSources(ED2KClientSocket *sock, std::string data); 
     242 
     243        /** 
    234244         * Chooses next server to perform UDP queries with 
    235245         */