root/hydranode/hncore/bt/client.cpp

Revision 2913, 27.4 kB (checked in by madcat, 3 years ago)

Multi-tracker support.

Line 
1 /*
2  *  Copyright (C) 2005-2006 Alo Sarv <madcat_@users.sourceforge.net>
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 2 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include <hncore/bt/client.h>
20 #include <hncore/bt/torrent.h>
21 #include <hncore/bt/files.h>
22 #include <hncore/bt/bittorrent.h>
23 #include <hnbase/timed_callback.h>
24 #include <boost/spirit.hpp>
25 #include <stack>
26
27 #define COL_SEND COL_CYAN
28 #define COL_RECV COL_GREEN
29
30 namespace Bt {
31 const std::string TRACE("bt.client");
32 std::map<std::string, std::string> s_clientNames;
33
34 void fillClientNames() {
35         s_clientNames["AZ"] = "Azureus";
36         s_clientNames["BB"] = "BitBuddy";
37         s_clientNames["BC"] = "BitComet";
38         s_clientNames["CT"] = "CTorrent";
39         s_clientNames["ML"] = "MoonlightTorrent";
40         s_clientNames["LT"] = "libtorrent";
41         s_clientNames["BX"] = "Bittorrent X";
42         s_clientNames["TS"] = "Torrentstorm";
43         s_clientNames["TN"] = "TorrentDotNET";
44         s_clientNames["SS"] = "SwarmScope";
45         s_clientNames["XT"] = "XanTorrent";
46         s_clientNames["BS"] = "BTSlave";
47         s_clientNames["ZT"] = "ZipTorrent";
48         s_clientNames["AR"] = "Arctic";
49         s_clientNames["SB"] = "Swiftbit";
50         s_clientNames["MP"] = "MooPolice";
51         s_clientNames["HN"] = "Hydranode";
52         s_clientNames["KT"] = "KTorrent";
53         s_clientNames["UT"] = "µTorrent";
54         s_clientNames["lt"] = "libtorrent";
55         s_clientNames["S"] = "Shadow's client";
56         s_clientNames["U"] = "UPnP NAT Bit Torrent";
57         s_clientNames["T"] = "BitTornado";
58         s_clientNames["A"] = "ABC";
59         s_clientNames["O"] = "Osprey Permaseed";
60 }
61
62 // Client::Request class
63 // ---------------------
64 Client::Request::Request() : m_index(), m_offset(), m_length() {}
65
66 Client::Request::Request(uint64_t index, uint64_t offset, uint64_t length)
67 : m_index(index), m_offset(offset), m_length(length) {}
68
69 Client::Request::Request(::Detail::LockedRangePtr r, Torrent *t)
70 : m_index(), m_offset(), m_length(), m_locked(r) {
71         m_index  = m_locked->begin() / t->getChunkSize();
72         m_offset = m_locked->begin() - t->getChunkSize() * m_index;
73         m_length = m_locked->length();
74 }
75
76 // Client class
77 // ------------
78 Client::Client(TcpSocket *sock) : BaseClient(&BitTorrent::instance()),
79 m_socket(sock), m_addr(sock->getPeer()), m_isChoking(true), m_isInterested(),
80 m_amChoking(true), m_amInterested(), m_handshakeSent(), m_needParts(), m_file(),
81 m_partData(), m_torrent(), m_sourceMaskAdded() {
82         if (!s_clientNames.size()) {
83                 fillClientNames();
84         }
85
86         logTrace(TRACE, "Client connected.");
87
88         setConnected(true);
89         sock->setHandler(this, &Client::onSocketEvent);
90         m_inBuffer.append(sock->getData());
91         if (m_inBuffer.size()) {
92                 parseBuffer();
93         }
94         Utils::timedCallback(boost::bind(&Client::sendPing, this), 2*60*1000);
95 }
96
97 Client::Client(IPV4Address addr) : BaseClient(&BitTorrent::instance()),
98 m_socket(new TcpSocket), m_addr(addr), m_isChoking(true), m_isInterested(),
99 m_amChoking(true), m_amInterested(), m_handshakeSent(), m_needParts(), m_file(),
100 m_partData(), m_torrent(), m_sourceMaskAdded() {
101         if (!s_clientNames.size()) {
102                 fillClientNames();
103         }
104
105         logTrace(TRACE, boost::format("Connecting to %s") % addr);
106
107         m_socket->setHandler(this, &Client::onSocketEvent);
108         m_socket->connect(addr, 30000);
109         Utils::timedCallback(boost::bind(&Client::sendPing, this), 2*60*1000);
110 }
111
112 Client::~Client() {
113         m_currentSpeedMeter.disconnect();
114         m_currentUploadMeter.disconnect();
115         if (m_partData && m_torrent && m_sourceMaskAdded) try {
116                 m_partData->delSourceMask(m_torrent->getChunkSize(),m_bitField);
117         } catch (std::exception &e) {
118                 logTrace(TRACE,
119                         boost::format(
120                                 "[%s] Exception while removing source-mask: %s"
121                         ) % m_addr % e.what()
122                 );
123         }
124 }
125
126 void Client::onSocketEvent(TcpSocket *sock, SocketEvent evt) {
127         if (
128                 (m_outRequests.size() || m_requests.size())
129                 && m_torrent->getPeerCount() > 50
130         ) {
131                 // when downloading or uploading is in progress, 10s timeout
132                 sock->setTimeout(10000);
133         } else {
134                 sock->setTimeout(2*70*1000); // 140 seconds timeout
135         }
136
137         if (evt == SOCK_READ) {
138                 m_inBuffer.append(sock->getData());
139                 parseBuffer();
140         } else if (evt == SOCK_WRITE && m_requests.size()) {
141                 sendNextChunk();
142         } else if (evt == SOCK_CONNECTED) {
143                 logTrace(TRACE,
144                         boost::format("[%s] Connection established.") % m_addr
145                 );
146                 setConnected(true);
147                 sendHandshake();
148         } else if (!sock->isConnected()) {
149                 boost::format fmt("[%s] %s");
150                 fmt % m_addr;
151                 switch (evt) {
152                         case SOCK_LOST:
153                                 fmt % "Remote host closed connection";
154                                 break;
155                         case SOCK_TIMEOUT:
156                                 fmt % "Connection timed out (no events)";
157                                 break;
158                         case SOCK_ERR:
159                                 fmt % "Socket error "
160                                         "(remote abnormal disconnection)";
161                                 break;
162                         case SOCK_CONNFAILED:
163                                 fmt % "Connection attempt refused";
164                                 break;
165                         default:
166                                 fmt % "Connection lost due to unknown reason";
167                                 break;
168                 }
169                 logTrace(TRACE, fmt);
170                 setConnected(false);
171                 connectionLost(this);
172         }
173 }
174
175 void Client::parsePeerId(const std::string &peerId) {
176         if (!peerId.size()) {
177                 m_clientSoft = "Unknown (no peer-id sent)";
178         }
179         std::string tmp;
180         std::string ver;
181         using namespace boost::spirit;
182         if (parse(peerId.data(), peerId.data() + peerId.size(),
183                 '-' >> repeat_p(2)[graph_p[push_back_a(tmp)]]
184                 >> repeat_p(4)[graph_p[push_back_a(ver)]] >> '-' >> *anychar_p
185         ).full && s_clientNames[tmp] != "") {
186                 boost::format fmt("v%d.%d.%d.%d");
187                 m_clientSoft = s_clientNames[tmp];
188                 fmt % ver[0] % ver[1] % ver[2] % ver[3];
189                 m_clientVersion = fmt.str();
190         } else if (parse(peerId.data(), peerId.data() + peerId.size(),
191                 alpha_p[assign_a(tmp)] >> repeat_p(3)[
192                         alnum_p[push_back_a(ver)]
193                 ] >> "--" >> *anychar_p
194         ).full && s_clientNames[tmp] != "") {
195                 boost::format fmt("v%d.%d.%d");
196                 m_clientSoft = s_clientNames[tmp];
197                 fmt % ver[0] % ver[1] % ver[2];
198                 m_clientVersion = fmt.str();
199         } else if (parse(peerId.data(), peerId.data() + peerId.size(),
200                 'M' >> graph_p[push_back_a(ver)] >> '-'
201                 >> graph_p[push_back_a(ver)] >> '-'
202                 >> graph_p[push_back_a(ver)] >> "--" >> *anychar_p
203         ).full) {
204                 boost::format fmt("v%d.%d.%d");
205                 fmt % ver[0] % ver[1] % ver[2];
206                 m_clientSoft = "Mainline";
207                 m_clientVersion = fmt.str();
208         } else if (parse(peerId.data(), peerId.data() + peerId.size(),
209                 "OP" >> repeat_p(4)[anychar_p[push_back_a(ver)]] >> *anychar_p
210         ).full) {
211                 boost::format fmt("build %d%d%d%d");
212                 m_clientSoft = "Opera";
213                 fmt % ver[0] % ver[1] % ver[2] % ver[3];
214                 m_clientVersion = fmt.str();
215         } else if (parse(peerId.data(), peerId.data() + peerId.size(),
216                 "XBT" >> repeat_p(4)[anychar_p[push_back_a(ver)]] >> *anychar_p
217         ).full) {
218                 boost::format fmt("v%d.%d.%d %s");
219                 fmt % ver[0] % ver[1] % ver[2];
220                 if (ver[3] == 'd') {
221                         fmt % "(debug build)";
222                 } else {
223                         fmt % "";
224                 }
225                 m_clientSoft = "XBT Client";
226                 m_clientVersion = fmt.str();
227         } else if (parse(peerId.data(), peerId.data() + peerId.size(),
228                 "-ML" >> anychar_p[push_back_a(ver)] >> '.'
229                 >> anychar_p[push_back_a(ver)] >> '.' >>
230                 anychar_p[push_back_a(ver)] >> *anychar_p
231         ).full) {
232                 m_clientSoft = "MLDonkey";
233                 boost::format fmt("v%s.%s.%s");
234                 m_clientVersion = (fmt % ver[0] % ver[1] % ver[2]).str();
235         } else if (parse(peerId.data(), peerId.data() + peerId.size(),
236                 "exbc" >> *anychar_p
237         ).full) {
238                 m_clientSoft = "BitComet (old)";
239         } else {
240                 m_clientSoft = "Unknown client: " + peerId;
241         }
242 }
243
244 void Client::sendPing() {
245         BEOStream tmp;
246         Utils::putVal<uint32_t>(tmp, 0); // len = 0, no payload
247         *m_socket << tmp.str();
248
249         logTrace(TRACE,
250                 boost::format(COL_SEND "[%s] << PING" COL_NONE)
251                 % m_addr
252         );
253 }
254
255 void Client::sendChoke() {
256         if (m_amChoking) {
257                 return;
258         }
259         bool dontChoke(false);
260
261         m_amChoking = true;
262         m_torrent->tryUnchoke(this, &dontChoke);
263
264         // we should continue uploading, retry in 30 seconds
265         if (dontChoke) {
266                 Utils::timedCallback(
267                         boost::bind(&Client::sendChoke, this),
268                         27000 + (Utils::getRandom() % 7) * 1000
269                 );
270                 m_amChoking = false;
271                 return;
272         }
273
274         BEOStream tmp;
275         Utils::putVal<uint32_t>(tmp, 1);  // len = 1
276         Utils::putVal<uint8_t >(tmp, 0);  // id  = 0
277         *m_socket << tmp.str();
278
279         logTrace(TRACE,
280                 boost::format(COL_SEND "[%s] <= CHOKE" COL_NONE)
281                 % m_addr
282         );
283         m_amChoking = true;
284         m_requests.clear();
285         setUploading(false, m_file);
286 }
287
288 void Client::sendUnchoke() {
289         if (!m_amChoking) {
290                 return;
291         }
292
293         BEOStream tmp;
294         Utils::putVal<uint32_t>(tmp, 1);  // len = 1
295         Utils::putVal<uint8_t >(tmp, 1);  // id  = 1
296         *m_socket << tmp.str();
297
298         logTrace(TRACE,
299                 boost::format(COL_SEND "[%s] <= UNCHOKE" COL_NONE)
300                 % m_addr
301         );
302         m_amChoking = false;
303         Utils::timedCallback(
304                 boost::bind(&Client::sendChoke, this),
305                 27000 + (Utils::getRandom() % 7) * 1000
306         );
307 }
308
309 void Client::sendInterested() {
310         if (m_amInterested) {
311                 return;
312         }
313
314         BEOStream tmp;
315         Utils::putVal<uint32_t>(tmp, 1);  // len = 1
316         Utils::putVal<uint8_t >(tmp, 2);  // id  = 2
317         *m_socket << tmp.str();
318
319         logTrace(TRACE,
320                 boost::format(COL_SEND "[%s] <= INTERESTED" COL_NONE)
321                 % m_addr
322         );
323         m_amInterested = true;
324 }
325
326 void Client::sendUninterested() {
327         if (!m_amInterested) {
328                 return;
329         }
330
331         BEOStream tmp;
332         Utils::putVal<uint32_t>(tmp, 1);  // len = 1
333         Utils::putVal<uint8_t >(tmp, 3);  // id  = 3
334         *m_socket << tmp.str();
335
336         logTrace(TRACE,
337                 boost::format(COL_SEND "[%s] <= UNINTERESTED" COL_NONE)
338                 % m_addr
339         );
340         m_amInterested = false;
341 }
342
343 void Client::sendHave(uint32_t index) {
344         BEOStream tmp;
345         Utils::putVal<uint32_t>(tmp, 5);     // len = 5
346         Utils::putVal<uint8_t >(tmp, 4);     // id  = 4
347         Utils::putVal<uint32_t>(tmp, index); // piece index
348         *m_socket << tmp.str();
349
350         logTrace(TRACE,
351                 boost::format(COL_SEND "[%s] <= HAVE index=%d" COL_NONE)
352                 % m_addr % index
353         );
354 }
355
356 void Client::sendBitfield() {
357         CHECK_THROW(m_torrent);
358         // if we don't have anything yet, don't send this
359         if (m_partData && !m_partData->getCompleted()) {
360                 return;
361         }
362         std::string bitField = m_torrent->getBitfield();
363
364         BEOStream tmp;
365         Utils::putVal<uint32_t>(tmp, bitField.size() + 1);   // len = 1 + X
366         Utils::putVal<uint8_t >(tmp, 5);                     // id = 5
367         Utils::putVal<std::string>(tmp, bitField, bitField.size());
368         *m_socket << tmp.str();
369
370         logTrace(TRACE,
371                 boost::format(COL_SEND "[%s] <= BITFIELD" COL_NONE)
372                 % m_addr
373         );
374 }
375
376 void Client::sendRequest(uint32_t index, uint32_t offset, uint32_t length) {
377         CHECK(m_bitField.at(index));
378         CHECK(offset + length - 1 <= m_torrent->getChunkSize());
379
380         BEOStream tmp;
381         Utils::putVal<uint32_t>(tmp, 13);      // len = 13
382         Utils::putVal<uint8_t >(tmp,  6);      // id  = 6
383         Utils::putVal<uint32_t>(tmp, index);   // piece index
384         Utils::putVal<uint32_t>(tmp, offset);  // relative offset to piece begin
385         Utils::putVal<uint32_t>(tmp, length);  // chunk length
386         *m_socket << tmp.str();
387
388         logTrace(TRACE,
389                 boost::format(
390                         COL_SEND
391                         "[%s] <= REQUEST index=%d offset=%d length=%d"
392                         COL_NONE
393                 ) % m_addr % index % offset % length
394         );
395 }
396
397 void Client::sendPiece(uint32_t index,uint32_t offset,const std::string &data) {
398         BEOStream tmp;
399         Utils::putVal<uint32_t>(tmp, 9 + data.size());      // len = 9 + X
400         Utils::putVal<uint8_t >(tmp, 7);                    // id  = 7
401         Utils::putVal<uint32_t>(tmp, index);                // piece index
402         Utils::putVal<uint32_t>(tmp, offset);               // relative offset
403         Utils::putVal<std::string>(tmp, data, data.size()); // data
404         *m_socket << tmp.str();
405
406         logTrace(TRACE,
407                 boost::format(
408                         COL_SEND "[%s] <= PIECE index=%d offset=%d length=%d"
409                         COL_NONE
410                 ) % m_addr % index % offset % data.size()
411         );
412 }
413
414 void Client::sendCancel(uint32_t index, uint32_t offset, uint32_t length) {
415         BEOStream tmp;
416         Utils::putVal<uint32_t>(tmp, 13);      // len = 13
417         Utils::putVal<uint8_t >(tmp,  8);      // id  =  8
418         Utils::putVal<uint32_t>(tmp, index);   // piece index
419         Utils::putVal<uint32_t>(tmp, offset);  // relative offset to piece begin
420         Utils::putVal<uint32_t>(tmp, length);  // chunk length
421         *m_socket << tmp.str();
422
423         logTrace(TRACE,
424                 boost::format(
425                         COL_SEND
426                         "[%s] <= CANCEL index=%d offset=%d length=%d"
427                         COL_NONE
428                 ) % m_addr % index % offset % length
429         );
430 }
431
432 // note: in logfile, it appears as if we always send handshake first (even for
433 // incoming clients), even though BT protocol spec says to wait for handshake
434 // in incoming clients; this is not so, the log is wrong, and we wait for remote
435 // handshake before sending ours for incoming connections; it appears
436 // differently in log due to some internal code logic in this function.
437 void Client::parseBuffer() {
438         if (m_inBuffer.size() < 4) {
439                 return;
440         }
441         if (!m_peerId.size() && m_inBuffer.size() >= 48) {
442                 using namespace boost::spirit;
443                 uint32_t strLen;
444                 std::string protName;
445                 std::string protFlags;
446                 std::string tmpHash;
447                 parse_info<> info = parse(
448                         m_inBuffer.data(), m_inBuffer.data()+ m_inBuffer.size(),
449                         anychar_p[assign_a(strLen)]
450                         >> repeat_p(boost::ref(strLen))[
451                                 anychar_p[push_back_a(protName)]
452                         ] >> repeat_p(8)[anychar_p[push_back_a(protFlags)]]
453                         >> repeat_p(20)[anychar_p[push_back_a(tmpHash)]]
454                         >> repeat_p(20)[anychar_p[push_back_a(m_peerId)]]
455                 );
456
457                 if (tmpHash.size() == 20) {
458                         m_infoHash = tmpHash;
459                         handshakeReceived(this);
460
461                         // handshakeReceived signal may result in this being
462                         // deleted, when the info_hash was unknown; return
463                         // quickly then.
464                         if (!m_torrent) {
465                                 return;
466                         }
467
468                         if (!m_handshakeSent) {
469                                 sendHandshake();
470                                 m_handshakeSent = true;
471                         }
472                 }
473                 if (m_peerId.size()) {
474                         // get rid of non-printable characters
475                         for (uint32_t i = 0; i < m_peerId.size(); ++i) {
476                                 if (m_peerId[i] < 32 || m_peerId[i] > 126) {
477                                         m_peerId.replace(i, 1, ".");
478                                 }
479                         }
480                         parsePeerId(m_peerId);
481                         logTrace(TRACE,
482                                 boost::format(
483                                         COL_RECV "[%s] => HANDSHAKE from %s %s"
484                                         COL_NONE
485                                 ) % m_addr % getSoft() % getSoftVersion()
486                         );
487                         m_inBuffer.erase(0, 48 + strLen + 1);
488                 }
489         }
490
491         if (m_peerId.size()) {
492                 while (m_inBuffer.size() >= 4) {
493                         BEIStream tmp(m_inBuffer.substr(0, 4));
494                         uint32_t len = Utils::getVal<uint32_t>(tmp);
495                         if (len == 0) {
496                                 onPing();
497                         } else {
498                                 if (m_inBuffer.size() < len + 4) {
499                                         break;
500                                 }
501                                 BEIStream packet(m_inBuffer.substr(4, len));
502                                 try {
503                                         parsePacket(packet, len);
504                                 } catch (std::exception &e) {
505                                         logDebug(
506                                                 boost::format("[%s] %s")
507                                                 % m_addr % e.what()
508                                         );
509                                         connectionLost(this);
510                                         return;
511                                 }
512                         }
513                         m_inBuffer.erase(0, len + 4);
514                 }
515         }
516 }
517
518 void Client::parsePacket(BEIStream &i, uint32_t len) {
519         switch (Utils::getVal<uint8_t>(i)) {
520                 case 0x00:
521                         onChoke();
522                         break;
523                 case 0x01:
524                         onUnchoke();
525                         break;
526                 case 0x02:
527                         onInterested();
528                         break;
529                 case 0x03:
530                         onUninterested();
531                         break;
532                 case 0x04:
533                         onHave(Utils::getVal<uint32_t>(i));
534                         break;
535                 case 0x05:
536                         onBitfield(Utils::getVal<std::string>(i, len - 1));
537                         break;
538                 case 0x06: {
539                         uint32_t index  = Utils::getVal<uint32_t>(i);
540                         uint32_t offset = Utils::getVal<uint32_t>(i);
541                         uint32_t length = Utils::getVal<uint32_t>(i);
542                         onRequest(index, offset, length);
543                         break;
544                 }
545                 case 0x07: {
546                         uint32_t index   = Utils::getVal<uint32_t>(i);
547                         uint32_t offset  = Utils::getVal<uint32_t>(i);
548                         std::string data = i.str().substr(9);
549                         onPiece(index, offset, data);
550                         break;
551                 }
552                 case 0x08: {
553                         uint32_t index  = Utils::getVal<uint32_t>(i);
554                         uint32_t offset = Utils::getVal<uint32_t>(i);
555                         uint32_t length = Utils::getVal<uint32_t>(i);
556                         onCancel(index, offset, length);
557                         break;
558                 }
559                 default:
560                         logTrace(TRACE,
561                                 boost::format(
562                                         "[%s] Received unknown packet: %s"
563                                 ) % m_addr % Utils::hexDump(i.str())
564                         );
565                         break;
566         }
567 }
568
569 void Client::sendHandshake() {
570         std::ostringstream tmp; // no endianess info needed in handshake packet
571         Utils::putVal<uint8_t>(tmp, 19);
572         Utils::putVal<std::string>(tmp, "BitTorrent protocol", 19);
573         Utils::putVal<uint64_t>(tmp, 0); // 8 zero bytes
574         Utils::putVal<std::string>(tmp, m_infoHash.toString(), 20);
575         Utils::putVal<std::string>(tmp, BitTorrent::instance().getId(), 20);
576
577         logTrace(TRACE,
578                 boost::format(COL_SEND "[%s] <= HANDSHAKE " COL_NONE) % m_addr
579         );
580
581         *m_socket << tmp.str();
582         m_handshakeSent = true;
583         sendBitfield();
584 }
585
586 void Client::onPing() {
587         logTrace(TRACE,
588                 boost::format(COL_RECV "[%s] => PING" COL_NONE) % m_addr
589         );
590 }
591
592 void Client::onChoke() {
593         logTrace(TRACE,
594                 boost::format(COL_RECV "[%s] => CHOKE" COL_NONE) % m_addr
595         );
596         m_isChoking = true;
597         setDownloading(false, m_partData);
598         m_usedRange.reset();
599         m_outRequests.clear();
600 }
601
602 void Client::onUnchoke() {
603         logTrace(TRACE,
604                 boost::format(COL_RECV "[%s] => UNCHOKE" COL_NONE) % m_addr
605         );
606         m_isChoking = false;
607         if (m_amInterested && m_partData) {
608                 setDownloading(true, m_partData);
609                 sendRequests();
610         }
611 }
612
613 void Client::onInterested() {
614         logTrace(TRACE,
615                 boost::format(COL_RECV "[%s] => INTERESTED" COL_NONE) % m_addr
616         );
617         m_isInterested = true;
618 }
619
620 void Client::onUninterested() {
621         logTrace(TRACE,
622                 boost::format(COL_RECV "[%s] => UNINTERESTED" COL_NONE) % m_addr
623         );
624         m_isInterested = false;
625 }
626
627 void Client::onHave(uint32_t index) {
628         CHECK_THROW(m_torrent);
629
630         logTrace(TRACE,
631                 boost::format(COL_RECV "[%s] => HAVE %d" COL_NONE)
632                 % m_addr % index
633         );
634         if (m_bitField.size()) try {
635                 m_bitField.at(index) = true;
636         } catch (std::out_of_range&) {
637                 logTrace(TRACE,
638                         boost::format("[%s] sent invalid HAVE message") % m_addr
639                 );
640         }
641         if (m_partData) {
642                 m_partData->addAvail(m_torrent->getChunkSize(), index);
643                 m_sourceMaskAdded = true;
644                 Range64 tmp(
645                         index * m_torrent->getChunkSize(),
646                         (index + 1) * m_torrent->getChunkSize() - 1
647                 );
648                 if (!m_amInterested && !m_partData->isComplete(tmp)) {
649                         checkNeedParts();
650                         if (m_needParts) {
651                                 sendInterested();
652                         }
653                 }
654                 if (m_amInterested && !m_isChoking) {
655                         setDownloading(true, m_partData);
656                         sendRequests();
657                 }
658         }
659 }
660
661 void Client::onBitfield(const std::string &bits) {
662         CHECK_THROW(m_torrent);
663
664         m_bitField.clear();
665         uint32_t trueBits = 0;
666         for (uint32_t i = 0; i < bits.size(); ++i) {
667                 std::bitset<8> b((unsigned)bits[i]);
668                 for (int8_t j = 7; j >= 0; --j) {
669                         m_bitField.push_back(b[j]);
670                         trueBits += b[j];
671                         if (m_bitField.size() == m_torrent->getChunkCnt()) {
672                                 break;
673                         }
674                 }
675                 if (m_bitField.size() == m_torrent->getChunkCnt()) {
676                         break;
677                 }
678         }
679         CHECK_THROW(m_bitField.size() == m_torrent->getChunkCnt());
680
681         logTrace(TRACE,
682                 boost::format(COL_RECV "[%s] => BITFIELD (%5.2f%%)" COL_NONE)
683                 % m_addr % (trueBits * 100.0 / m_bitField.size())
684         );
685
686         if (m_bitField.size() == trueBits) {
687                 // in Hydranode, empty bitfield indicates 'seed'
688                 // note that this is different from BT spec, where empty/missing
689                 // bitfield means the client has nothing at all
690                 m_bitField.clear();
691         }
692
693         if (m_partData) {
694                 m_partData->addSourceMask(m_torrent->getChunkSize(),m_bitField);
695                 m_sourceMaskAdded = true;
696         }
697
698         checkNeedParts();
699
700         if (m_needParts) {
701                 sendInterested();
702         } else {
703                 logTrace(TRACE,
704                         boost::format(
705                                 "[%s] We don't need anything from this client."
706                         ) % m_addr
707                 );
708         }
709 }
710
711 void Client::onRequest(uint32_t index, uint32_t offset, uint32_t length) {
712         CHECK_THROW(m_torrent);
713         if (m_amChoking) { // ignore requests from choked clients
714                 logTrace(TRACE,
715                         boost::format(
716                                 "[%s] Ignoring request (client is choked)"
717                         ) % m_addr
718                 );
719                 return;
720         }
721         logTrace(TRACE,
722                 boost::format(
723                         COL_RECV "[%s] => REQUEST index=%d offset=%d length=%d"
724                         COL_NONE
725                 ) % m_addr % index % offset % length
726         );
727         if (!isUploading()) {
728                 setUploading(true, m_file);
729         }
730         m_requests.push_back(Request(index, offset, length));
731         if (m_socket->isWritable() && m_requests.size() == 1) {
732                 sendNextChunk();
733         }
734 }
735
736 void Client::onPiece(uint32_t index, uint32_t offset, const std::string &data) {
737         if (!m_outRequests.size()) {
738                 sendUninterested();
739                 return;
740         }
741         CHECK_THROW(m_torrent);
742
743         logTrace(TRACE,
744                 boost::format(
745                         COL_RECV "[%s] => PIECE index=%d offset=%d length=%d"
746                         COL_NONE
747                 ) % m_addr % index % offset % data.size()
748         );
749
750         Request r = m_outRequests.front();
751         m_torrent->delRequest(r);
752         m_outRequests.pop_front();
753         if (Request(index, offset, data.size()) == r) try {
754                 r.m_locked->write(r.m_locked->begin(), data);
755         } catch (std::exception &e) {
756                 logDebug(
757                         boost::format("[%s] Writing data: %s") % m_addr
758                         % e.what()
759                 );
760         } else try {
761                 uint64_t beg = index * m_torrent->getChunkSize() + offset;
762                 m_partData->write(beg, data);
763         } catch (std::exception &e) {
764                 (void)e;
765                 logTrace(TRACE,
766                         boost::format("[%s] Ignoring %d bytes duplicate data.")
767                         % m_addr % data.size()
768                 );
769         }
770         sendRequests();
771         m_torrent->addDownloaded(data.size());
772 }
773
774 void Client::onCancel(uint32_t index, uint32_t offset, uint32_t length) {
775         logTrace(TRACE,
776                 boost::format(
777                         COL_RECV "[%s] => CANCEL index=%d offset=%d length=%d"
778                         COL_NONE
779                 ) % m_addr % index % offset % length
780         );
781         std::list<Request>::iterator it = m_requests.begin();
782         Request toCancel(index, offset, length);
783         while (it != m_requests.end()) {
784                 if (*it == toCancel) {
785                         logTrace(TRACE,
786                                 boost::format("[%s] Canceling request.")
787                                 % m_addr
788                         );
789                         m_requests.erase(it);
790                         it = m_requests.begin();
791                 } else {
792                         ++it;
793                 }
794         }
795 }
796
797 void Client::sendNextChunk() try {
798         CHECK_RET(m_requests.size());
799         CHECK_THROW(m_torrent);
800         CHECK_THROW(m_file);
801
802         Request r = m_requests.front();
803         uint64_t begin = m_torrent->getChunkSize() * r.m_index + r.m_offset;
804         std::string data(m_file->read(begin, begin + r.m_length - 1));
805         assert(data.size() == r.m_length);
806         sendPiece(r.m_index, r.m_offset, data);
807         m_requests.pop_front();
808
809         updateSignals();
810         m_torrent->addUploaded(data.size());
811         TorrentFile *file = dynamic_cast<TorrentFile*>(m_file);
812         if (file) try {
813                 SharedFile *curFile = file->getContains(
814                         Range64(begin, data.size())
815                 );
816                 curFile->addUploaded(data.size());
817         } catch (...) {}
818
819 } catch (SharedFile::ReadError &e) {
820         if (e.reason() == SharedFile::ETRY_AGAIN_LATER) {
821                 Utils::timedCallback(
822                         boost::bind(&Client::sendNextChunk, this), 3000
823                 );
824         } else {
825                 logError(
826                         boost::format("[%s] Sending next chunk: %s")
827                         % m_addr % e.what()
828                 );
829         }
830 } catch (std::exception &e) {
831         logError(
832                 boost::format("[%s] Sending next chunk: %s")
833                 % m_addr % e.what()
834         );
835 } MSVC_ONLY(;)
836
837 void Client::setTorrent(Torrent *t) {
838         assert(!m_torrent || m_torrent == t);
839
840         m_torrent = t;
841         m_infoHash = t->getInfoHash();
842 }
843
844 void Client::setFile(SharedFile *file) {
845         assert(!m_file || m_file == file);
846
847         m_file = file;
848 }
849
850 void Client::setFile(PartData *file) {
851         assert(!m_partData || m_partData == file);
852
853         m_partData = file;
854         if (m_partData) {
855                 m_partData->onVerified.connect(
856                         boost::bind(&Client::onVerified, this, _1, _2, _3)
857                 );
858                 m_partData->onDestroyed.connect(
859                         boost::bind(&Client::onDestroyed, this, _1)
860                 );
861         }
862 }
863
864 void Client::checkNeedParts() {
865         m_needParts = false;
866         if (m_partData) {
867                 CHECK_THROW(m_torrent);
868                 ::Detail::UsedRangePtr ret;
869                 ret = m_partData->getRange(
870                         m_torrent->getChunkSize(), m_bitField
871                 );
872                 if (ret) {
873                         m_needParts = true;
874                 } else {
875                         logDebug(
876                                 boost::format(
877                                         "[%s] Client has no needed chunks."
878                                 ) % m_addr
879                         );
880                 }
881         }
882 }
883
884 void Client::onVerified(PartData *file, uint32_t chunkSize, uint32_t chunk) {
885         CHECK_RET(file == m_partData);
886         CHECK_THROW(m_torrent);
887
888         if (!m_socket->isConnected()) {
889                 return;
890         }
891
892         if (chunkSize == m_torrent->getChunkSize()) {
893                 sendHave(chunk);
894         }
895
896         uint32_t removed = 0;
897         std::deque<Request>::iterator i(m_outRequests.begin());
898         while (i != m_outRequests.end()) {
899                 if ((*i).m_index == chunk) {
900                         sendCancel((*i).m_index, (*i).m_offset, (*i).m_length);
901                         m_outRequests.erase(i);
902                         i = m_outRequests.begin();
903                         ++removed;
904                 } else {
905                         ++i;
906                 }
907         }
908         if (removed) {
909                 logTrace(TRACE,
910                         boost::format("[%s] Canceled %d requests from queue.")
911                         % m_addr % removed
912                 );
913         }
914         if (m_amInterested && !m_isChoking) {
915                 sendRequests();
916         }
917 }
918
919 /**
920  * When adding multiple requests in one go, we want to add the requests to
921  * the Torrent object in FILO ordering, AFTER all requests have been generated.
922  * Hence, this object temporarly stores the requests made, and upon destruction,
923  * submits them to the specified Torrent object (in FILO ordering).
924  */
925 struct RequestAdder : public std::stack<Client::Request> {
926         RequestAdder(Torrent *t) : m_t(t) {}
927         ~RequestAdder() {
928                 while (size()) {
929                         m_t->addRequest(top()); pop();
930                 }
931         }
932         Torrent *m_t;
933 };
934
935 void Client::sendRequests() try {
936         CHECK_THROW(m_partData);
937         CHECK_THROW(m_torrent);
938
939         boost::scoped_ptr<RequestAdder> toAdd(new RequestAdder(m_torrent));
940         while (m_outRequests.size() < 5 && m_partData) {
941                 if (!m_usedRange) {
942                         m_usedRange = m_partData->getRange(
943                                 m_torrent->getChunkSize(), m_bitField
944                         );
945                 }
946                 if (!m_usedRange) {
947                         Request r(m_torrent->getRequest(m_bitField));
948                         std::deque<Request>::iterator it(m_outRequests.begin());
949                         while (it != m_outRequests.end()) {
950                                 CHECK_THROW(*it != r);
951                                 ++it;
952                         }
953                         sendRequest(r.m_index, r.m_offset, r.m_length);
954                         m_outRequests.push_back(r);
955                 }
956                 if (m_usedRange) {
957                         ::Detail::LockedRangePtr l(m_usedRange->getLock(16384));
958                         if (!l) {
959                                 m_usedRange.reset();
960                                 continue;
961                         }
962                         Request r(l, m_torrent);
963                         sendRequest(r.m_index, r.m_offset, r.m_length);
964                         m_outRequests.push_back(r);
965                         if (m_socket->getDownSpeed() < 1024) {
966                                 toAdd->push(r);
967                         } else {
968                                 logTrace(TRACE,
969                                         boost::format("[%s] <- Fast source.")
970                                         % m_addr
971                                 );
972                         }
973         &nbs