Changeset 1486

Show
Ignore:
Timestamp:
07/17/05 18:11:08 (3 years ago)
Author:
wubbla
Message:

m_connections is now a boost::multi_index_container; PartData::getRange() is now used to download specific ranges; MD5-hashes are now added to MetaData? automatically (if found)

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • testing/http/download.cpp

    r1484 r1486  
    7070    do { 
    7171        it = m_connections.begin(); 
    72         deleteConnection( (++it)->first ); 
     72        deleteConnection(++it); 
    7373    } while(it != m_connections.end()); 
    74  
    7574    m_connections.clear(); 
    7675} 
     
    7978void Download::onPDEvent(PartData *pd, int evt) {  
    8079    if(pd == m_pd && evt == PD_COMPLETE) { 
     80        logTrace(TRACE, "Download completed... now deleting"); 
    8181        HttpClient::instance().deleteDownload(this);  
    8282    } 
     
    9696             
    9797        case EVT_SF_FOUND:  
    98         { 
    9998            logTrace(TRACE, "Got EVT_SF_FOUND (sharedfile was found) --> starting download"); 
    100             int port = boost::lexical_cast<int>(m_url["port"]); 
    101             IPV4Address addr(m_url["host"], port); 
    102  
    103             Parser *p = createConnection(addr); 
    104             p->setHostName(m_url["hostname"]); 
    105             p->getFile(m_url["path"]); 
    106             p->writeData.connect(boost::bind(&Download::writeData, this, _b1, _b2)); 
    107              
    108             break; 
    109         }     
     99            getRandomChunk();     
     100            break; 
     101         
    110102        case EVT_ERROR: 
    111103            logError("An error occured..."); 
     
    114106} 
    115107 
    116 void Download::onParserEvent(Parser *p, ParserEvent evt) { 
     108void Download::getRandomChunk(uint64_t chunksize) { 
     109    logTrace(TRACE, "getRandomChunk()"); 
     110    uint64_t reqSize = m_size/4; 
     111    if(chunksize) {  
     112        reqSize = chunksize;  
     113    } 
     114    doGet(reqSize); 
     115
     116 
     117void Download::getWholeFile() { 
     118    logTrace(TRACE, "getWholeFile()"); 
     119    doGet(m_size); 
     120
     121 
     122void Download::doGet(uint64_t chunksize) { 
     123    logTrace(TRACE, "doGet()"); 
     124     
     125    Detail::UsedRangePtr used = m_pd->getRange(chunksize); 
     126    m_locked = used->getLock(chunksize); 
     127     
     128    int port = boost::lexical_cast<int>(m_url["port"]); 
     129    IPV4Address addr(m_url["host"], port); 
     130     
     131    Parser *p = createConnection(addr); 
     132    p->setHostName(m_url["hostname"]); 
     133    p->writeData.connect(boost::bind(&Download::writeData, this, _b1, _b2, _b3)); 
     134         
     135    if(chunksize != m_size) { 
     136        Range64 *range = new Range64(m_locked->begin(), m_locked->end()); 
     137             
     138        if(range->length() > 1) { 
     139            logTrace(TRACE,  
     140                boost::format("trying to get range: %d-%d")  
     141                % range->begin() % range->end() 
     142            );       
     143            p->getChunk(m_url["path"], range); 
     144        } 
     145    } else { 
     146        p->getFile(m_url["path"]); 
     147    }    
     148
     149 
     150void Download::onParserEvent(Parser *p, ParserEvent evt) try { 
    117151    HttpSocket *s = getSocket(p); 
     152    boost::format fmt("Parser(%p): %s"); 
     153    fmt % p; 
    118154    switch(evt) { 
    119155        case EVT_SIZE: 
    120             logTrace(TRACE, 
    121                 boost::format("Connection(%p) found new filesize") % s 
    122             ); 
     156            fmt % "found new filesize"; 
    123157            if(m_size > 0) { 
    124                 logError("Filesize is already set!")
     158                fmt % "Filesize is already set!"
    125159            } else { 
    126160                m_size = getParser(s)->getSize(); 
    127                 logTrace(TRACE, 
    128                     boost::format("setting filesize to %i") % m_size 
    129                 ); 
    130                  
     161                fmt % "setting filesize to %i" % m_size; 
    131162                deleteConnection(s); 
    132                  
    133163                if(m_size > 0) { 
    134164                    postEvent(EVT_SET_SIZE); 
    135165                }  
    136166            } 
    137             break; 
    138  
     167            break; 
     168         
     169        case EVT_COMPLETE: 
     170            fmt % "completed its download..."; 
     171            if(!m_pd->isComplete()) { 
     172                getRandomChunk(); 
     173            }  
     174            break; 
     175             
     176        case EVT_MD5:  
     177        { 
     178            fmt % "found a MD5 digest: %s" % p->getMD5(); 
     179            Hash<MD5Hash> hash(p->getMD5()); 
     180            HashSet<MD5Hash> *hs = new HashSet<MD5Hash>(hash); 
     181            m_md->addHashSet(hs); 
     182            break; 
     183        } 
     184             
     185        case EVT_NORANGES: 
     186            fmt % "doesn't support ranges, getting the whole file..."; 
     187            getWholeFile(); 
     188            break; 
     189         
    139190        case EVT_DESTROY: 
    140             logTrace(TRACE, 
    141                 boost::format("Connection(%p) wants to be destroyed :D") % s  
    142             ); 
     191            fmt % "wants to be destroyed :D";  
    143192            deleteConnection(s); 
    144193            break; 
    145194         
    146195        case EVT_FAILURE: 
    147             logTrace(TRACE, 
    148                 boost::format("Connection(%p) received an " 
    149                               "unsuccessful HTTP response: %s")  
    150                 % s % getParser(s)->getStatusCode() 
    151             ); 
     196            fmt % "received an unsuccessful HTTP response: %s"  
     197                % getParser(s)->getStatusCode(); 
    152198            break; 
    153199                     
    154200        default: 
    155             logTrace(TRACE, 
    156                 boost::format("Connection(%p) posted unhandled event: %p") % s % evt 
    157             );               
    158     } 
    159 
     201            fmt % "posted unhandled event: %p" % evt; 
     202    } 
     203    logTrace(TRACE, fmt.str()); 
     204     
     205} catch(std::exception &e) { 
     206    logError(std::string("onParserEvent():") + e.what()); 
     207
     208MSVC_ONLY(;) 
    160209 
    161210void Download::createDownload() { 
     
    231280} 
    232281 
    233 void Download::writeData(const std::string &data, uint64_t offset) { 
     282void Download::writeData(Parser *p, const std::string &data, uint64_t offset) { 
    234283    CHECK_THROW(m_pd != NULL); 
    235284     
    236     logTrace(TRACE, boost::format("writeData() at offset: %i") % offset); 
     285    logTrace(TRACE,  
     286        boost::format("writeData(parser=%p) at offset: %i") % p % offset 
     287    ); 
    237288     
    238289    try { 
    239         m_pd->write(offset, data); 
     290        if(m_locked) { 
     291            m_locked->write(offset, data); 
     292        } else { 
     293            m_pd->write(offset, data); 
     294        } 
    240295    } catch(std::exception &e) { 
    241296        logError(boost::format("Unable to write data: %s") % e.what()); 
     
    256311    Parser *p = new Parser; 
    257312    p->getEventTable().addHandler(p, this, &Download::onParserEvent); 
    258     //p->writeData.connect(boost::bind(&Download::writeData, this, _b1, _b2)); 
    259313    p->sendData.connect(boost::bind(&Download::sendData, this, _b1, _b2)); 
     314    Detail::LockedRangePtr ptr; 
    260315         
    261     m_connections.insert( std::make_pair(s, p) ); 
    262  
     316    m_connections.insert( Connection(s, p, ptr) ); 
    263317    return p; 
    264318} 
    265319 
    266320void Download::deleteConnection(HttpSocket *s) { 
    267     logTrace(TRACE,  
    268         boost::format("deleteConnection(%p)") % s     
    269     ); 
    270     connIter it = m_connections.find(s); 
    271      
    272     if(it != m_connections.end()) { 
     321    logTrace(TRACE, boost::format("deleteConnection(%p)") % s); 
     322     
     323    socketIter it = m_connections.get<socket>().find(s); 
     324     
     325    if(it != m_connections.get<socket>().end()) { 
    273326        HttpSocket *tmp = s; 
    274         Parser *p = (*it).second
     327        Parser *p = it->parser
    275328        delete s; 
    276329        delete p; 
     
    279332        m_connections.erase(tmp); 
    280333    } 
     334     
    281335    logTrace(TRACE, "deleteConnection finished..."); 
    282336} 
    283337 
     338void Download::deleteConnection(connIter it) { 
     339    logTrace(TRACE, "deleteConnection(connIter)"); 
     340     
     341    if(it != m_connections.end()) { 
     342        HttpSocket *s = it->socket; 
     343        Parser *p = it->parser; 
     344        delete s; 
     345        delete p; 
     346        p = 0; 
     347        s = 0;   
     348        m_connections.erase(it); 
     349    } else { 
     350        throw std::runtime_error("no such Connection"); 
     351    } 
     352} 
     353 
    284354void Download::sendData(Parser *p, const std::string &data) { 
    285     HttpSocket *s = getSocket(p); 
    286     s->write(data);     
     355    try {  
     356        HttpSocket *s = getSocket(p); 
     357        s->write(data);     
     358    } catch(std::exception &e) { 
     359        logError(boost::format("unable to get Socket: %s") % e.what()); 
     360    }  
    287361} 
    288362 
    289363HttpSocket* Download::getSocket(Parser *p) { 
    290     connIter it = m_connections.begin(); 
    291     for( ; it != m_connections.end(); ++it) { 
    292         if(it->second == p) { 
    293             return it->first; 
    294         } 
    295     } 
     364    logTrace(TRACE, boost::format("getSocket(parser=%p)") % p); 
     365    parserIter it = m_connections.get<parser>().find(p); 
     366    if(it != m_connections.get<parser>().end()) { 
     367        logTrace(TRACE, boost::format("found socket=%p") % it->socket); 
     368        return it->socket; 
     369    } else { 
     370        throw std::runtime_error("couldn't find socket in MIConn");      
     371    }     
    296372} 
    297373 
    298374Parser* Download::getParser(HttpSocket *s) { 
    299     connIter it = m_connections.find(s); 
    300     CHECK_THROW(it != m_connections.end()); 
    301     return it->second; 
     375    logTrace(TRACE, boost::format("getParser(socket=%p)") % s); 
     376    socketIter it = m_connections.get<socket>().find(s); 
     377    if(it != m_connections.get<socket>().end()) { 
     378        logTrace(TRACE, boost::format("found parser=%p") % it->parser); 
     379        return it->parser; 
     380    } else { 
     381        throw std::runtime_error("couldn't find parser in MIConn");      
     382    } 
    302383} 
    303384 
  • testing/http/download.h

    r1481 r1486  
    2626#include <hn/object.h> 
    2727#include <hn/ssocket.h> 
     28#include <boost/multi_index_container.hpp> 
     29#include <boost/multi_index/key_extractors.hpp> 
     30#include <boost/multi_index/ordered_index.hpp> 
     31#include <boost/multi_index/member.hpp> 
    2832 
    2933namespace Http { 
    3034 
     35using namespace boost; 
     36using namespace boost::multi_index; 
     37 
    3138typedef SSocket<HttpClient, Socket::Client, Socket::TCP> HttpSocket; 
    32 typedef std::map<HttpSocket*, Parser*> connMap; 
    33 typedef connMap::iterator connIter; 
     39 
     40typedef struct Connection { 
     41    HttpSocket* socket; 
     42    Parser* parser; 
     43    Detail::LockedRangePtr range;     
     44     
     45    Connection(HttpSocket *s, Parser *p, Detail::LockedRangePtr r)  
     46              : socket(s), parser(p), range(r) {} 
     47}; 
     48 
     49//tags for accessing the corresponding indices of MIConn 
     50struct socket{}; 
     51struct parser{}; 
     52struct range{}; 
     53 
     54typedef multi_index_container<  
     55    Connection,  
     56    indexed_by< 
     57        ordered_unique< 
     58            tag<socket>,  
     59            BOOST_MULTI_INDEX_MEMBER(Connection, HttpSocket*, socket) 
     60        >, 
     61        ordered_unique< 
     62            tag<parser>,  
     63            BOOST_MULTI_INDEX_MEMBER(Connection, Parser*, parser) 
     64        >, 
     65        ordered_unique< 
     66            tag<range>,  
     67            BOOST_MULTI_INDEX_MEMBER(Connection, Detail::LockedRangePtr, range) 
     68        > 
     69    > 
     70> MIConn; 
     71 
     72typedef MIConn::iterator connIter; 
     73typedef MIConn::nth_index<0>::type::iterator socketIter; 
     74typedef MIConn::nth_index<1>::type::iterator parserIter; 
     75typedef MIConn::nth_index<2>::type::iterator rangeIter; 
    3476 
    3577enum DownloadEvent { 
     
    4587    ~Download(); 
    4688     
    47     void writeData(const std::string &data, uint64_t offset); 
     89    void writeData(Parser *p, const std::string &data, uint64_t offset); 
    4890    void sendData(Parser *p, const std::string &data); 
    4991 
     
    5496    void postEvent(DownloadEvent evt);   
    5597    void onDownloadEvent(DownloadEvent evt); 
     98     
     99    void getWholeFile(); 
     100    void getRandomChunk(uint64_t chunksize = 0); 
    56101     
    57102    //! Comparison operator for std containers 
     
    68113    Parser* createConnection(const IPV4Address &addr); 
    69114    void deleteConnection(HttpSocket *s); 
     115    void deleteConnection(connIter it); 
    70116    void findSharedFile(); 
    71117    HttpSocket* getSocket(Parser *p); 
    72118    Parser* getParser(HttpSocket *s); 
    73          
     119    void doGet(uint64_t chunksize); 
     120             
    74121    std::map<std::string, std::string> m_url; 
    75     connMap m_connections; 
     122    MIConn m_connections; 
    76123     
    77124    uint64_t    m_size; 
     
    80127    SharedFile* m_sf; 
    81128    MetaData*   m_md;     
     129    Detail::LockedRangePtr m_locked; 
    82130}; 
    83131 
  • testing/http/http.h

    r1481 r1486  
    3737    EVT_DATA, 
    3838    EVT_FAILURE, 
    39     EVT_DESTROY 
     39    EVT_DESTROY, 
     40    EVT_COMPLETE, 
     41    EVT_NORANGES 
    4042}; 
    4143 
  • testing/http/parser.cpp

    r1483 r1486  
    110110} 
    111111 
     112std::string Parser::getMD5() { 
     113    return getHeader("content-md5"); 
     114} 
     115 
    112116void Parser::tryRequest(const std::string &file) { 
    113117    //absolute path AND hostname: 
     
    219223            getEventTable().postEvent(this, EVT_SIZE); 
    220224        } 
     225        if(getHeader("accept-ranges") != "bytes") { 
     226            getEventTable().postEvent(this, EVT_NORANGES); 
     227        }        
    221228    } 
    222229} 
     
    248255     
    249256    if(success) { 
    250         writeData(data, m_offset); 
     257        writeData(this, data, m_offset); 
    251258        m_offset += data.size(); 
     259    } 
     260    if(m_offset == m_range->end() + 1) { 
     261        getEventTable().postEvent(this, EVT_COMPLETE); 
    252262    } 
    253263} 
  • testing/http/parser.h

    r1482 r1486  
    4242    void setHostName(const std::string &hostname); 
    4343     
    44     std::string         getHeader(const std::string &header); 
    45     int                 getStatusCode(); 
    46     uint64_t            getSize();     
    47     void                getFile(const std::string &file); 
    48     void                getInfo(const std::string &file); 
    49     void                getChunk(const std::string &file, Range64 *range); 
    50                  
    51     boost::signal<void (const std::string&, uint64_t)> writeData; 
     44    std::string getHeader(const std::string &header); 
     45    std::string getMD5(); 
     46    int         getStatusCode(); 
     47    uint64_t    getSize();     
     48    void        getFile(const std::string &file); 
     49    void        getInfo(const std::string &file); 
     50    void        getChunk(const std::string &file, Range64 *range); 
     51     
     52    boost::signal<void (Parser*, const std::string&, uint64_t)> writeData; 
    5253    boost::signal<void (Parser*, const std::string&)> sendData; 
    5354