root/hydranode/hncore/partdata.cpp

Revision 3013, 45.3 kB (checked in by madcat, 2 years ago)

If there are no chunks in the file, fall back to standard getNextChunk() method when generating used range.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 /*
2  *  Copyright (C) 2004-2006 Alo Sarv <madcat_@users.sourceforge.net>
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 2 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 /**
20  * \file partdata.cpp Implementation of PartData and related classes
21  */
22
23 #include <hncore/pch.h>
24
25 #include <hnbase/log.h>
26 #include <hnbase/lambda_placeholders.h>
27 #include <hnbase/hash.h>
28 #include <hnbase/prefs.h>
29 #include <hnbase/timed_callback.h>
30
31 #include <hncore/partdata.h>
32 #include <hncore/partdata_impl.h>
33 #include <hncore/metadata.h>
34 #include <hncore/hasher.h>
35 #include <hncore/hydranode.h>
36
37 #include <boost/lambda/lambda.hpp>
38 #include <boost/lambda/if.hpp>
39 #include <boost/lambda/bind.hpp>
40 #include <boost/filesystem/operations.hpp>
41 #include <boost/algorithm/string/replace.hpp>
42
43 #include <fstream>
44 #include <fcntl.h>
45
46 using namespace boost::lambda;
47 using namespace boost::multi_index;
48 using namespace CGComm;
49
50 static const uint32_t BUF_SIZE_LIMIT = 512*1024; //!< 512k buffer
51
52 namespace Detail {
53
54 // UsedRange class
55 // -------------------------
56 template<typename IterType>
57 UsedRange::UsedRange(PartData *parent, IterType it) : Range64((*it).begin(),
58 (*it).end()), m_parent(parent),
59 m_chunk(new PosIter(project<ID_Pos>(*m_parent->m_chunks, it))) {
60         CHECK_THROW(parent != 0);
61         CHECK_THROW(*m_chunk != get<ID_Pos>(*parent->m_chunks).end());
62
63         // SpamReduction(tm)
64         if (length() > 1) {
65                 logTrace(TRACE_PARTDATA,
66                         boost::format("%s: Using range %d..%d")
67                         % m_parent->m_dest.leaf() % begin() % end()
68                 );
69         }
70
71         get<ID_Pos>(*m_parent->m_chunks).modify(
72                 *m_chunk, ++bind(&Chunk::m_useCnt, __1)
73         );
74 }
75
76 UsedRange::UsedRange(PartData *parent, uint64_t begin, uint64_t end) :
77 Range64(begin, end), m_parent(parent) {
78         m_chunk.reset(new PosIter(get<ID_Pos>(*m_parent->m_chunks).end()));
79
80         if (length() > 1) {
81                 logTrace(TRACE_PARTDATA,
82                         boost::format("%s: Using range %d..%d")
83                         % m_parent->m_dest.leaf() % this->begin() % this->end()
84                 );
85         }
86 }
87
88 UsedRange::~UsedRange() {
89         if (length() > 1) {
90                 logTrace(TRACE_PARTDATA,
91                         boost::format("%s: Un-using range %d..%d")
92                         % m_parent->m_dest.leaf() % begin() % end()
93                 );
94         }
95
96         if (*m_chunk != m_parent->m_chunks->get<ID_Pos>().end()) {
97                 get<ID_Pos>(*m_parent->m_chunks).modify(
98                         *m_chunk, --bind(&Chunk::m_useCnt, __1)
99                 );
100         }
101 }
102
103 LockedRangePtr UsedRange::getLock(uint32_t size) {
104         return m_parent->getLock(shared_from_this(), size);
105 }
106
107 bool UsedRange::isComplete() const {
108         return m_parent->isComplete(*this);
109 }
110
111 // LockedRange class
112 // ---------------------------
113 LockedRange::LockedRange(PartData *parent, Range64 r, UsedRangePtr used)
114 : Range64(r), m_parent(parent),
115 m_chunk(new PosIter(get<ID_Pos>(*m_parent->m_chunks).end())), m_used(used) {
116
117         // SpamReduction(tm)
118         if (length() > 1) {
119                 logTrace(TRACE_PARTDATA,
120                         boost::format("%s: Locking range %d..%d")
121                         % m_parent->m_dest.leaf() % begin() % end()
122                 );
123         }
124
125         m_parent->m_locked.merge(*this);
126 }
127
128 LockedRange::LockedRange(
129         PartData *parent, Range64 r, PosIter &it, UsedRangePtr used
130 ) : Range64(r), m_parent(parent), m_chunk(new PosIter(it)), m_used(used) {
131         CHECK_THROW(parent != 0);
132
133         if (length() > 1) {
134                 logTrace(TRACE_PARTDATA,
135                         boost::format("%s: Locking range %d..%d")
136                         % m_parent->m_dest.leaf() % begin() % end()
137                 );
138         }
139
140         m_parent->m_locked.merge(*this);
141 }
142
143 LockedRange::~LockedRange() {
144         if (length() > 1) {
145                 logTrace(TRACE_PARTDATA,
146                         boost::format("%s: UnLocking range %d..%d")
147                         % m_parent->m_dest.leaf() % begin() % end()
148                 );
149         }
150
151         m_parent->m_locked.erase(*this);
152 }
153
154 void LockedRange::write(uint64_t begin, const std::string &data) {
155         if (begin > end() || begin + data.size() - 1 > end()) {
156                 throw PartData::LockError("Writing outside lock.");
157         }
158         if (*m_chunk != get<ID_Pos>(*m_parent->m_chunks).end()) {
159                 m_parent->m_chunks->get<ID_Pos>().modify(
160                         *m_chunk, bind(&Chunk::write, __1, begin, data)
161                 );
162         } else {
163                 m_parent->doWrite(begin, data);
164                 m_parent->tryComplete();
165         }
166 }
167
168 // Chunk class
169 // ---------------------
170 Chunk::Chunk(
171         PartData *parent, uint64_t begin, uint64_t end,
172         uint32_t size, const HashBase *hash
173 ) : Range64(begin, end), m_parent(parent), m_hash(hash), m_verified(
174         m_parent->isVerified(*this)
175 ), m_partial(
176         m_parent->m_complete.contains(*this) &&
177         !m_parent->m_complete.containsFull(*this)
178 ), m_complete(m_parent->isComplete(*this)), m_avail(), m_useCnt(), m_size(size)
179 {
180         if (isComplete() && !isVerified() && hash) {
181                 HashWorkPtr c = m_parent->verifyRange(*this, m_hash);
182                 if (c) {
183                         c->getEventTable().addHandler(
184                                 c, this, &Chunk::onHashEvent
185                         );
186                 }
187         }
188 }
189
190 Chunk::Chunk(
191         PartData *parent, Range64 range, uint32_t size, const HashBase *hash
192 ) : Range64(range), m_parent(parent), m_hash(hash), m_verified(
193         m_parent->isVerified(*this)
194 ), m_partial(
195         m_parent->m_complete.contains(*this) &&
196         !m_parent->m_complete.containsFull(*this)
197 ), m_complete(m_parent->isComplete(*this)), m_avail(), m_useCnt(), m_size(size)
198 {
199         if (isComplete() && !isVerified() && hash) {
200                 HashWorkPtr c = m_parent->verifyRange(*this, m_hash);
201                 if (c) {
202                         c->getEventTable().addHandler(
203                                 c, this, &Chunk::onHashEvent
204                         );
205                 }
206         }
207 }
208
209 void Chunk::write(uint64_t begin, const std::string &data) {
210         m_parent->doWrite(begin, data);
211         if (!m_partial) {
212                 CMPosIndex &idx = m_parent->m_chunks->get<ID_Pos>();
213                 CMPosIndex::iterator it = idx.find(*this);
214                 assert(it != idx.end());
215                 idx.modify(it, bind(&Chunk::m_partial, __1) = true);
216         }
217 }
218
219 void Chunk::updateState() const {
220         CMPosIndex &idx = m_parent->m_chunks->get<ID_Pos>();
221         CMPosIndex::iterator it = idx.find(*this);
222         assert(it != idx.end());
223         if (m_parent->m_complete.contains(*this)) {
224                 if (m_parent->m_complete.containsFull(*this)) {
225                         idx.modify(it, bind(&Chunk::m_partial, __1) = false);
226                         idx.modify(it, bind(&Chunk::m_complete, __1) = true);
227                 } else {
228                         idx.modify(it, bind(&Chunk::m_partial, __1) = true);
229                         idx.modify(it, bind(&Chunk::m_complete, __1) = false);
230                 }
231         } else {
232                 idx.modify(it, bind(&Chunk::m_partial, __1) = false);
233                 idx.modify(it, bind(&Chunk::m_complete, __1) = false);
234         }
235         idx.modify(
236                 it, bind(&Chunk::m_verified, __1) = m_parent->isVerified(*this)
237         );
238         if (isComplete() && !isVerified()) {
239                 const_cast<Chunk*>(this)->verify();
240         }
241 }
242
243 void Chunk::verify(bool save) {
244         logTrace(TRACE_PARTDATA,
245                 boost::format("%s: Completed chunk %d..%d")
246                 % m_parent->m_dest.leaf() % this->begin() % this->end()
247         );
248
249         CMPosIndex &idx = m_parent->m_chunks->get<ID_Pos>();
250         CMPosIndex::iterator it = idx.find(*this);
251         assert(it != idx.end());
252         idx.modify(it, bind(&Chunk::m_partial, __1) = false);
253         idx.modify(it, bind(&Chunk::m_verified, __1) = false);
254         idx.modify(it, bind(&Chunk::m_complete, __1) = true);
255
256         if (m_hash) {
257                 HashWorkPtr c = m_parent->verifyRange(*this, m_hash, save);
258                 if (c) {
259                         c->getEventTable().addHandler(
260                                 c, this, &Chunk::onHashEvent
261                         );
262                 }
263         } else {
264                 logWarning(
265                         boost::format(
266                                 "%s: Chunk %s..%s has no hash - "
267                                 "shouldn't happen..."
268                         ) % m_parent->m_dest.leaf() %this->begin() % this->end()
269                 );
270                 m_parent->tryComplete();
271         }
272 }
273
274 void Chunk::onHashEvent(HashWorkPtr c, HashEvent evt){
275         assert(*c->getRef() == *getHash());
276         c->getEventTable().delHandlers(c);
277
278         CMPosIndex &idx = m_parent->m_chunks->get<ID_Pos>();
279         CMPosIndex::iterator it = idx.find(*this);
280         assert(it != idx.end());
281         if (evt == HASH_FAILED) {
282                 m_parent->corruption(*this);
283                 idx.modify(it, bind(&Chunk::m_verified, __1) = false);
284                 idx.modify(it, bind(&Chunk::m_complete, __1) = false);
285                 idx.modify(it, bind(&Chunk::m_partial, __1) = false);
286                 if (m_parent->m_fullJob) {
287                         m_parent->m_fullJob->cancel();
288                         m_parent->m_fullJob = HashWorkPtr();
289                 }
290                 m_parent->m_partStatus[m_size][begin()/m_size] = false;
291                 logTrace(TRACE_CHUNKS,
292                         boost::format("Chunk #%d is corrupt.")
293                         % (begin() / m_size)
294                 );
295                 assert(!isComplete());
296         } else if (evt == HASH_VERIFIED) {
297                 idx.modify(it, bind(&Chunk::m_complete, __1) = true);
298                 idx.modify(it, bind(&Chunk::m_verified, __1) = true);
299                 idx.modify(it, bind(&Chunk::m_partial, __1) = false);
300                 m_parent->m_partStatus[m_size][begin()/m_size] = true;
301                 m_parent->m_verified.merge(*this);
302                 m_parent->m_complete.merge(*this);
303                 m_parent->onVerified(m_parent, m_size, begin()/m_size);
304                 logTrace(TRACE_CHUNKS,
305                         boost::format("Verified chunk #%d") % (begin() / m_size)
306                 );
307         } else if (evt == HASH_FATAL_ERROR) {
308                 logError(
309                         boost::format("Fatal error hashing file `%s'")
310                         % c->getFileName().native_file_string()
311                 );
312         }
313         --m_parent->m_pendingHashes;
314         m_parent->tryComplete();
315 }
316
317 /**
318  * AllocJob allocates disk space for temp file. Emits event 'true' when
319  * allocation succeeds, false otherwise.
320  */
321 class AllocJob : public ThreadWork {
322 public:
323         DECLARE_EVENT_TABLE(AllocJobPtr, bool);
324         AllocJob(const boost::filesystem::path &file, uint64_t size);
325         virtual bool process();
326 private:
327         boost::filesystem::path m_file;
328         uint64_t m_size;
329 };
330 IMPLEMENT_EVENT_TABLE(AllocJob, AllocJobPtr, bool);
331
332 AllocJob::AllocJob(const boost::filesystem::path &file, uint64_t size)
333 : m_file(file), m_size(size) {}
334
335 // allocates disk space in 10mb increments
336 bool AllocJob::process() {
337         int fd = open(m_file.native_file_string().c_str(), O_RDWR|O_BINARY);
338         if (!fd) {
339                 getEventTable().postEvent(AllocJobPtr(this), false);
340         } else {
341                 ::lseek64(fd, m_size - 1, SEEK_SET);
342                 ::write(fd, "1", 1);
343                 ::fsync(fd);
344                 ::close(fd);
345                 assert(Utils::getFileSize(m_file) <= m_size);
346                 bool ret = Utils::getFileSize(m_file) == m_size;
347                 getEventTable().postEvent(AllocJobPtr(this), ret);
348         }
349         setComplete();
350         return true;
351 }
352 } // namespace Detail
353
354 using namespace Detail;
355
356 // PartData exception classes
357 // --------------------------
358 PartData::LockError::LockError(const std::string &msg):std::runtime_error(msg){}
359 PartData::RangeError::RangeError(const std::string &mg):std::runtime_error(mg){}
360
361 // PartData class
362 // --------------
363 IMPLEMENT_EVENT_TABLE(PartData, PartData*, int);
364
365 PartData::PartData(
366         uint64_t size,
367         const boost::filesystem::path &loc,
368         const boost::filesystem::path &dest
369 ) : Object(0), m_size(size), m_loc(loc), m_dest(dest), m_chunks(new ChunkMap),
370 m_toFlush(), m_md(), m_pendingHashes(), m_sourceCnt(), m_fullSourceCnt(),
371 m_paused(), m_stopped(), m_autoPaused() {
372         initSignals();
373
374         std::ofstream o(loc.string().c_str(), std::ios::binary);
375         o.flush();
376         getEventTable().postEvent(this, PD_ADDED);
377         cleanupName();
378 }
379
380
381 PartData::PartData(const boost::filesystem::path &p) try : Object(0),
382 m_size(), m_chunks(new ChunkMap), m_toFlush(), m_md(), m_pendingHashes(),
383 m_sourceCnt(), m_fullSourceCnt(), m_paused(), m_stopped(), m_autoPaused() {
384         initSignals();
385
386         logTrace(TRACE_PARTDATA,
387                 boost::format("Loading temp file: %s") % p.string()
388         );
389         using namespace Utils;
390         using boost::algorithm::replace_last;
391
392         std::string tmp(p.string());
393         replace_last(tmp, ".dat.bak", ""); // in case of backups
394         replace_last(tmp, ".dat", "");
395
396         m_loc = boost::filesystem::path(tmp, boost::filesystem::native);
397
398         std::ifstream ifs(p.string().c_str(), std::ios::binary);
399         CHECK_THROW(ifs);
400         CHECK_THROW(Utils::getVal<uint8_t>(ifs) == OP_PARTDATA);
401
402         (void)Utils::getVal<uint16_t>(ifs);
403         if (Utils::getVal<uint8_t>(ifs) != OP_PD_VER) {
404                 logWarning("Unknown partdata version.");
405         }
406         m_size = Utils::getVal<uint64_t>(ifs);
407
408         uint16_t tagc = Utils::getVal<uint16_t>(ifs);
409
410         while (tagc-- && ifs) {
411                 uint8_t   oc = getVal<uint8_t>(ifs);
412                 uint16_t len = getVal<uint16_t>(ifs);
413                 switch (oc) {
414                         case OP_PD_DESTINATION:
415                                 m_dest = getVal<std::string>(ifs).value();
416                                 break;
417                         case OP_PD_COMPLETED:
418                                 if (Utils::getVal<uint8_t>(ifs)!=OP_RANGELIST) {
419                                         logWarning("Invalid tag.");
420                                         ifs.seekg(len, std::ios::cur);
421                                 } else {
422                                         (void)Utils::getVal<uint16_t>(ifs);
423                                         m_complete = RangeList64(ifs);
424                                 }
425                                 break;
426                         case OP_PD_VERIFIED:
427                                 if (Utils::getVal<uint8_t>(ifs)!=OP_RANGELIST) {
428                                         logWarning("Invalid tag.");
429                                         ifs.seekg(len, std::ios::cur);
430                                 } else {
431                                         (void)Utils::getVal<uint16_t>(ifs);
432                                         m_verified = RangeList64(ifs);
433                                 }
434                                 break;
435                         case OP_PD_STATE: {
436                                 uint8_t state = Utils::getVal<uint8_t>(ifs);
437                                 if (state == STATE_STOPPED) {
438                                         m_stopped = true;
439                                 } else if (state == STATE_PAUSED) {
440                                         m_paused = true;
441                                 }
442                                 break;
443                         }
444                         default:
445                                 logWarning("Unhandled tag in PartData.");
446                                 ifs.seekg(len, std::ios::cur);
447                                 break;
448                 }
449         }
450
451         if (ifs && Utils::getVal<uint8_t>(ifs) == OP_METADATA) {
452                 (void)Utils::getVal<uint16_t>(ifs);
453                 m_md = new MetaData(ifs);
454         }
455
456         cleanupName();
457 //      printCompleted();
458
459 } catch (std::runtime_error &) {
460         delete m_md;
461 }
462 MSVC_ONLY(;)
463
464 PartData::PartData(
465         const boost::filesystem::path &path, MetaData *md,
466         RangeList64 completed, RangeList64 verified
467 ) : Object(0), m_size(md->getSize()), m_loc(path), m_complete(completed),
468 m_verified(verified), m_chunks(new ChunkMap), m_toFlush(), m_md(md),
469 m_pendingHashes(), m_sourceCnt(), m_fullSourceCnt(), m_paused(), m_stopped(),
470 m_autoPaused() {
471         initSignals();
472
473         for (uint32_t i = 0; i < md->getHashSetCount(); ++i) {
474                 HashSetBase *hs = md->getHashSet(i);
475                 if (hs->getChunkSize() && hs->getChunkCnt()) {
476                         addHashSet(hs);
477                 }
478         }
479
480         std::string incDir(
481                 Prefs::instance().read<std::string>("/Incoming", "")
482         );
483
484         setDestination(boost::filesystem::path(incDir) / md->getName());
485         save();
486         getEventTable().postEvent(this, PD_ADDED);
487 }
488
489 // protected default constructor
490 PartData::PartData() : Object(0), m_size(), m_chunks(new ChunkMap), m_toFlush(),
491 m_md(), m_pendingHashes(), m_sourceCnt(), m_fullSourceCnt(), m_paused(),
492 m_stopped(), m_autoPaused() {
493         initSignals();
494 }
495
496 PartData::~PartData() {
497         getEventTable().delHandlers(this);
498 }
499
500 // boost::signals breaks when the first time the signal is invoced it's done
501 // from different module than the one where it was originally created in. while
502 // the majority of these signals are only invoced from inside partdata, they
503 // are part of partdata's public api, so anyone can invoke them (sadly), hence
504 // make sure they get properly initalized here.
505 //
506 // since many of the signals duplicate events, we can make our life bit easier
507 // here, and connect the postevent calls to the signals, so we don't have to do
508 // both things everywhere (which would quickly become errorprone).
509 void PartData::initSignals() {
510         onPaused.connect(
511                 boost::bind(
512                         &EventTable<PartData*, int>::postEvent,
513                         &getEventTable(), _b1, PD_PAUSED
514                 )
515         );
516         onStopped.connect(
517                 boost::bind(
518                         &EventTable<PartData*, int>::postEvent,
519                         &getEventTable(), _b1, PD_STOPPED
520                 )
521         );
522         onResumed.connect(
523                 boost::bind(
524                         &EventTable<PartData*, int>::postEvent,
525                         &getEventTable(), _b1, PD_RESUMED
526                 )
527         );
528         onCompleted.connect(
529                 boost::bind(
530                         &EventTable<PartData*, int>::postEvent,
531                         &getEventTable(), _b1, PD_COMPLETE
532                 )
533         );
534         onCanceled.connect(
535                 boost::bind(
536                         &EventTable<PartData*, int>::postEvent,
537                         &getEventTable(), _b1, PD_CANCELED
538                 )
539         );
540 }
541
542 void PartData::init() {
543         CHECK_RET(m_md);
544
545         // add hashes from metadata
546         for (uint32_t i = 0; i < m_md->getHashSetCount(); ++i) {
547                 HashSetBase *hs = m_md->getHashSet(i);
548                 if (hs->getChunkSize() && hs->getChunkCnt()) {
549                         addHashSet(hs);
550                 }
551         }
552
553         // It's possible we loaded a complete file - if so, re-try completing.
554         if (isComplete()) {
555                 Utils::timedCallback(
556                         boost::bind(&PartData::tryComplete, this), 0
557                 );
558         } else {
559                 uint32_t modDate = Utils::getModDate(m_loc);
560                 bool needRehash = false;
561                 if (m_md && modDate != m_md->getModDate()) {
562                         // allow 1ms errors (FAT32 causes them)
563                         if (modDate + 1 != m_md->getModDate()) {
564                                 needRehash = true;
565                         }
566                 }
567                 if (needRehash) {
568                         logMsg(
569                                 boost::format(
570                                         "%s: Modification date changed "
571                                         "(%d != %d), rehashing completed parts."
572                                 ) % m_dest.leaf() % modDate % m_md->getModDate()
573                         );
574                         rehashCompleted();
575                 }
576         }
577
578         getEventTable().postEvent(this, PD_ADDED);
579 }
580
581 void PartData::rehashCompleted() {
582         CMPosIndex &idx = m_chunks->get<ID_Pos>();
583         for (CMPosIndex::iterator i = idx.begin(); i != idx.end(); ++i) {
584                 if ((*i).getHash()) {
585                         idx.modify(i, bind(&Chunk::verify, __1, false));
586                 }
587         }
588 }
589
590 void PartData::addSourceMask(
591         uint64_t chunkSize, const std::vector<bool> &chunks
592 ) {
593         typedef CMLenIndex::iterator Iter;
594
595         ++m_sourceCnt;
596         if (chunks.empty()) {
597                 addFullSource(chunkSize);
598                 return;
599         }
600
601         // ed2k has 1 more chunk if size == N * chunkSize
602         uint64_t expected = getChunkCount(chunkSize);
603         if (m_size % chunkSize == 0 && chunkSize == ED2K_PARTSIZE) {
604                 ++expected;
605         }
606         if (chunks.size() != expected) {
607                 boost::format err(
608                         "Invalid number of values in chunkmap "
609                         "(expected %d, got %d)"
610                 );
611                 err % expected % chunks.size();
612                 throw std::runtime_error(err.str());
613         }
614
615         checkAddChunkMap(chunkSize);
616         int i = 0;
617         CMLenIndex& pi = m_chunks->get<ID_Length>();
618         std::pair<Iter, Iter> ret = pi.equal_range(chunkSize);
619         for (Iter j = ret.first; j != ret.second; ++j) {
620                 pi.modify(j, bind(&Chunk::m_avail, __1) += chunks[i++]);
621         }
622 }
623
624 void PartData::addFullSource(uint64_t chunkSize) {
625         typedef CMLenIndex::iterator Iter;
626
627         checkAddChunkMap(chunkSize);
628         CMLenIndex& pi = m_chunks->get<ID_Length>();
629         std::pair<Iter, Iter> ret = pi.equal_range(chunkSize);
630         for (Iter i = ret.first; i != ret.second; ++i) {
631                 pi.modify(i, ++bind(&Chunk::m_avail, __1));
632         }
633         ++m_fullSourceCnt;
634 }
635 void PartData::delSourceMask(
636         uint64_t chunkSize, const std::vector<bool> &chunks
637 ) {
638         typedef CMLenIndex::iterator Iter;
639
640         CHECK_THROW(m_sourceCnt);
641         --m_sourceCnt;
642         if (chunks.empty()) {
643                 delFullSource(chunkSize);
644                 return;
645         }
646
647         // ed2k has 1 more chunk if size == N * chunkSize
648         uint64_t expected = getChunkCount(chunkSize);
649         if (m_size % chunkSize == 0 && chunkSize == ED2K_PARTSIZE) {
650                 ++expected;
651         }
652         if (chunks.size() != expected) {
653                 boost::format err(
654                         "Invalid number of values in chunkmap "
655                         "(expected %d, got %d)"
656                 );
657                 err % expected % chunks.size();
658                 throw std::runtime_error(err.str());
659         }
660
661         int i = 0;
662         CMLenIndex& pi = m_chunks->get<ID_Length>();
663         std::pair<Iter, Iter> ret = pi.equal_range(chunkSize);
664         for (Iter j = ret.first; j != ret.second; ++j) {
665                 if(chunks[i]) { // ensures we don't integer underflow
666                         CHECK_THROW((*j).m_avail);
667                 }
668                 pi.modify(j, bind(&Chunk::m_avail, __1) -= chunks[i++]);
669         }
670 }
671 void PartData::delFullSource(uint64_t chunkSize) {
672         typedef CMLenIndex::iterator Iter;
673
674         CHECK_THROW(m_fullSourceCnt);
675         --m_fullSourceCnt;
676         CMLenIndex& pi = m_chunks->get<ID_Length>();
677         std::pair<Iter, Iter> ret = pi.equal_range(chunkSize);
678         for (Iter i = ret.first; i != ret.second; ++i) {
679                 CHECK_THROW((*i).m_avail);
680                 pi.modify(i, --bind(&Chunk::m_avail, __1));
681         }
682 }
683
684 /**
685  * Unary function object for usage with doGetRange() method
686  *
687  * @param r  Ignored
688  * @returns Always true
689  */
690 struct TruePred { bool operator()(const Range64 &) { return true; } };
691
692 /**
693  * Unary function object for usage with doGetRange() method, in order to check
694  * if the generated range is contained within the chunkmap contained within this
695  * functor.
696  *
697  * @param r    Range candidate to be checked
698  * @return     True if @param r is contained within m_rl, false otherwise
699  */
700 struct CheckPred {
701         CheckPred(const std::vector<bool> &chunks, uint32_t chunkSize)
702         : m_chunks(chunks), m_chunkSize(chunkSize) {}
703
704         bool operator()(const Range64 &r) {
705                 if (r.begin() > m_chunks.size() * m_chunkSize) {
706                         return false;
707                 }
708                 uint32_t chunk1 = r.begin() / m_chunkSize;
709                 uint32_t chunk2 = r.end() / m_chunkSize + 1;
710                 while (chunk1 != chunk2) {
711                         if (!m_chunks[chunk1++]) {
712                                 return false;
713                         }
714                 }
715                 return true;
716         }
717         const std::vector<bool> &m_chunks;
718         uint64_t m_chunkSize;
719 };
720
721
722 UsedRangePtr PartData::getRange(uint32_t size) {
723         TruePred pred;
724         return doGetRange(size, pred);
725 }
726
727 UsedRangePtr PartData::getRange(
728         uint32_t size, const std::vector<bool> &chunks
729 ) {
730         if (chunks.empty()) {
731                 TruePred pred;
732                 return doGetRange(size, pred);
733         }
734         CheckPred pred(chunks, size);
735         return doGetRange(size, pred);
736 }
737
738 template<typename Predicate>
739 UsedRangePtr PartData::getNextChunk(uint64_t size, Predicate &pred) {
740         UsedRangePtr ret;
741         if (m_complete.empty()) {
742                 ret = UsedRangePtr(new UsedRange(this, 0, size));
743                 if (ret->end() > m_size - 1) {
744                         ret->end(m_size - 1);
745                 }
746         } else if (m_complete.front().begin() > 0) {
747                 uint64_t end = m_complete.front().begin() - 1;
748                 ret = UsedRangePtr(new UsedRange(this, 0, end));
749         } else if (m_complete.front().end() < m_size) {
750                 uint64_t beg = m_complete.front().end() + 1;
751                 uint64_t end = beg + size > m_size ? m_size : beg +size;
752                 ret = UsedRangePtr(new UsedRange(this, beg, end));
753         }
754         if (!ret->getLock(1)) {
755                 ret.reset();
756         }
757         return ret;
758 }
759
760 template<typename Predicate>
761 UsedRangePtr PartData::doGetRange(uint64_t size, Predicate &pred) {
762         typedef CMSelectIndex::iterator SIter;
763
764         if (m_chunks->empty()) {
765                 return getNextChunk(size, pred);
766         }
767         CMSelectIndex &idx = m_chunks->template get<ID_Selector>();
768         std::pair<SIter, SIter> r = idx.equal_range(
769                 boost::make_tuple(false, true)
770         );
771         if (!std::distance(r.first, r.second)) {
772                 return getNextChunk(size, pred);
773         }
774
775         uint32_t hopCnt = 0;
776         for (SIter i = r.first; i != r.second; ++i) {
777                 if (pred(*i) && canLock(*i)) {
778                         UsedRangePtr ret(new UsedRange(this, i));
779                         assert(ret->getLock(1));
780                         logTrace(TRACE_CHUNKS,
781                                 boost::format(
782                                         "Selected chunk #%d (avail=%d, "
783                                         "usecnt=%d partial=%s complete="
784                                         "%s) in %d hops"
785                                 ) % ((*i).begin() / (*i).m_size)
786                                 % (*i).m_avail % (*i).m_useCnt
787                                 % ((*i).m_partial ? "yes" : "no")
788                                 % ((*i).m_complete ? "yes" : "no")
789                                 % hopCnt
790                         );
791                         return ret;
792                 }
793                 ++hopCnt;
794         }
795         logTrace(TRACE_CHUNKS,
796                 boost::format("Failure to select chunk, did %d hops") % hopCnt
797         );
798         return UsedRangePtr();
799 }
800
801 uint64_t PartData::getChunkCount(uint64_t chunkSize) const {
802         return m_size / chunkSize + (m_size % chunkSize ? 1 : 0);
803 }
804
805 void PartData::checkAddChunkMap(uint64_t cs) {
806         typedef CMLenIndex::iterator LIter;
807
808         logTrace(TRACE_PARTDATA, boost::format("Adding chunkmap (cs=%d)") % cs);
809
810         if (m_partStatus.find(cs) == m_partStatus.end()) {
811                 m_partStatus[cs] = std::vector<bool>(getChunkCount(cs));
812         }
813         std::pair<LIter, LIter> ret =m_chunks->get<ID_Length>().equal_range(cs);
814         if (ret.first == m_chunks->get<ID_Length>().end()) {
815                 for (uint64_t i = 0; i < getChunkCount(cs); ++i) {
816                         uint64_t beg = i * cs;
817                         uint64_t end = (i + 1) * cs - 1;
818                         if (end > m_size - 1) {
819                                 end = m_size - 1;
820                         }
821                         Chunk c(this, beg, end, cs);
822                         m_chunks->insert(c);
823                         m_partStatus[cs][i] = isComplete(beg, end);
824                 }
825         }
826 }
827
828 void PartData::addHashSet(const HashSetBase *hs) {
829         if (!getSize()) {
830                 logDebug("Cannot add hashes to files of size 0.");
831                 return;
832         }
833
834         CHECK_THROW(hs->getChunkSize() > 0);
835         typedef CMLenIndex::iterator LIter;
836
837         logTrace(TRACE_PARTDATA,
838                 boost::format("Adding hashset (cs=%d cc=%d)")
839                 % hs->getChunkSize() % hs->getChunkCnt()
840         );
841
842         uint32_t cs = hs->getChunkSize();
843         std::pair<LIter, LIter> ret =m_chunks->get<ID_Length>().equal_range(cs);
844
845         if (ret.first == m_chunks->get<ID_Length>().end()) {
846                 checkAddChunkMap(cs);
847                 ret = m_chunks->get<ID_Length>().equal_range(cs);
848         }
849
850         int cc = hs->getChunkCnt();
851         int check = std::distance(ret.first, ret.second);
852
853         // file of size of multiple of chunksize has one more chunkhash in ed2k
854         // TODO: is this portable across networks?
855         CHECK_THROW(m_size % cs ? check == cc : check == cc - 1);
856
857         uint32_t j = 0;
858         bool saved = false;
859         for (LIter i = ret.first; i != ret.second; ++i) {
860                 m_chunks->get<ID_Length>().modify(
861                         i, bind(&Chunk::m_hash, __1) = &(*hs)[j++]
862                 );
863                 if ((*i).isComplete() && !(*i).isVerified()) {
864                         if (!saved) {
865                                 save();
866                                 saved = true;
867                         }
868                         m_chunks->get<ID_Length>().modify(
869                                 i, bind(&Chunk::verify, __1, false)
870                         );
871                 }
872         }
873 }
874
875 bool PartData::canLock(const Range64 &r, uint32_t size) const {
876         Range64 cand(r.begin(), r.begin());
877         typedef RangeList64::CIter CIter;
878         CIter i = m_complete.getContains(cand);
879         CIter j = m_locked.getContains(cand);
880         CIter k = m_dontDownload.getContains(cand);
881         do {
882                 i = m_complete.getContains(cand);
883                 j = m_locked.getContains(cand);
884                 k = m_dontDownload.getContains(cand);
885                 if (i != m_complete.end()) {
886                         cand = Range64((*i).end() + 1, (*i).end() + 1);
887                 } else if (j != m_locked.end()) {
888                         cand = Range64((*j).end() + 1, (*j).end() + 1);
889                 } else if (k != m_dontDownload.end()) {
890                         cand = Range64((*k).end() + 1, (*k).end() + 1);
891                 } else {
892                         break;
893                 }
894         } while (r.contains(cand));
895         return r.contains(cand);
896 }
897
898 LockedRangePtr PartData::getLock(UsedRangePtr used, uint32_t size) {
899         Range64 cand(used->begin(), used->begin());
900         typedef RangeList64::CIter CIter;
901         CIter i = m_complete.getContains(cand);
902         CIter j = m_locked.getContains(cand);
903         CIter k = m_dontDownload.getContains(cand);
904         do {
905                 i = m_complete.getContains(cand);
906                 j = m_locked.getContains(cand);
907                 k = m_dontDownload.getContains(cand);
908                 if (i != m_complete.end()) {
909                         cand = Range64((*i).end() + 1, (*i).end() + 1);
910                 } else if (j != m_locked.end()) {
911                         cand = Range64((*j).end() + 1, (*j).end() + 1);
912                 } else if (k != m_dontDownload.end()) {
913                         cand = Range64((*k).end() + 1, (*k).end() + 1);
914                 } else {
915                         break;
916                 }
917         } while (used->contains(cand));
918         if (!used->contains(cand)) {
919                 return LockedRangePtr();
920         }
921         cand.end(cand.begin() + size - 1);
922         if (cand.end() > used->end()) {
923                 cand.end(used->end());
924         }
925         i = m_complete.getContains(cand);
926         if (i != m_complete.end()) {
927                 cand.end((*i).begin() - 1);
928         }
929         i = m_locked.getContains(cand);
930         if (i != m_locked.end()) {
931                 cand.end((*i).begin() - 1);
932         }
933         k = m_dontDownload.getContains(cand);
934         if (k != m_dontDownload.end()) {
935                 cand.end((*k).begin() - 1);
936         }
937         CHECK_THROW(cand.length() <= size);
938         return LockedRangePtr(
939                 new LockedRange(this, cand, *used->m_chunk, used)
940         );
941 }
942
943 // publically available write method, this allows writing to the file without
944 // aquiring Used/Locked ranges - useful for simple protocols.
945 // Basically it just forwards the call to doWrite(), however it does a bunch of
946 // checks prior to the call, as well as bunch of chunkmap updates after the
947 // call in order to keep everyone happy.
948 void PartData::write(uint64_t begin, const std::string &data) {
949         logTrace(TRACE_PARTDATA,
950                 boost::format("Safe-writing at offset %d.") % begin
951         );
952
953         CHECK_THROW(!m_locked.contains(begin, begin + data.size() - 1));
954         CHECK_THROW(!m_complete.contains(begin, begin + data.size() - 1));
955
956         doWrite(begin, data);
957         tryComplete();
958 }
959
960 void PartData::updateChunks(Range64 range) {
961         CMPosIndex &pi = m_chunks->get<ID_Pos>();
962         CMPosIndex::iterator i = pi.lower_bound(Chunk(this, range, 0));
963         if (i != pi.end() && (*i).contains(range)) {
964                 (*i).updateState();
965         }
966         CMPosIndex::iterator j = i;
967         while (i != pi.begin() && (*--i).contains(range)) {
968                 (*i).updateState();
969         }
970         while (j != pi.end() && ++j != pi.end() && (*j).contains(range)) {
971                 (*j).updateState();
972         }
973 }
974
975 void PartData::setComplete(Range64 range) {
976         m_complete.merge(range);
977         m_dontDownload.erase(range);
978         updateChunks(range);
979 }
980
981 void PartData::setCorrupt(Range64 range) {
982         m_complete.erase(range);
983         m_corrupt.merge(range);
984         m_verified.erase(range);
985         onCorruption(this, range);
986         updateChunks(range);
987 }
988
989 void PartData::setVerified(Range64 range) {
990         m_verified.merge(range);
991         m_corrupt.erase(range);
992         m_complete.merge(range);
993         updateChunks(range);
994 }
995
996 // don't try to call doComplete() from here, since this method is called from
997 // Chunk::write(), prior to the chunk attempting to hash itself (if it has
998 // hashes). Thus, doComplete() check here would incorrectly succeed, since
999 // m_pendingHashes hasn't been increased, which would then lead to full rehash
1000 // being done prior to last chunkhash, resulting in crash later on during file
1001 // moving.
1002 void PartData::doWrite(uint64_t begin, const std::string &data) {
1003         // since we allow (on certain conditions) resizing partdata on the fly,
1004         // this check ensures that we don't allow writing to a file of size 0
1005         CHECK_THROW(m_size);
1006         // don't allow writing to paused/stopped file
1007         CHECK_THROW(isRunning());
1008
1009         logTrace(TRACE_PARTDATA,
1010                 boost::format("Writing at offset %d, datasize is %d")
1011                 % begin % data.size()
1012         );
1013
1014         CHECK_THROW(!m_complete.contains(begin, begin + data.size() - 1));
1015
1016         m_buffer[begin] = data;
1017         setComplete(Range64(begin, begin + data.size() - 1));
1018         m_toFlush += data.size();
1019         getEventTable().postEvent(this, PD_DATA_ADDED);
1020         if (m_toFlush >= BUF_SIZE_LIMIT) {
1021                 save();
1022         }
1023         dataAdded(this, begin, data.size());
1024 }
1025