libspatialindex API Reference  (git-trunk)
BulkLoader.cc
Go to the documentation of this file.
1 /******************************************************************************
2  * Project: libspatialindex - A C++ library for spatial indexing
3  * Author: Marios Hadjieleftheriou, mhadji@gmail.com
4  ******************************************************************************
5  * Copyright (c) 2002, Marios Hadjieleftheriou
6  *
7  * All rights reserved.
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a
10  * copy of this software and associated documentation files (the "Software"),
11  * to deal in the Software without restriction, including without limitation
12  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
13  * and/or sell copies of the Software, and to permit persons to whom the
14  * Software is furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included
17  * in all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
20  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
22  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25  * DEALINGS IN THE SOFTWARE.
26 ******************************************************************************/
27 
28 #include <cstring>
29 #include <cstdio>
30 #include <cmath>
31 
32 #ifndef _MSC_VER
33 #include <unistd.h>
34 #endif
35 
37 
38 #include "RTree.h"
39 #include "Leaf.h"
40 #include "Index.h"
41 #include "BulkLoader.h"
42 
43 using namespace SpatialIndex;
44 using namespace SpatialIndex::RTree;
45 
46 //
47 // ExternalSorter::Record
48 //
50 = default;
51 
52 ExternalSorter::Record::Record(const Region& r, id_type id, uint32_t len, uint8_t* pData, uint32_t s)
53 : m_r(r), m_id(id), m_len(len), m_pData(pData), m_s(s)
54 {
55 }
56 
58 {
59  delete[] m_pData;
60 }
61 
63 {
64  if (m_s != r.m_s)
65  throw Tools::IllegalStateException("ExternalSorter::Record::operator<: Incompatible sorting dimensions.");
66 
67  if (m_r.m_pHigh[m_s] + m_r.m_pLow[m_s] < r.m_r.m_pHigh[m_s] + r.m_r.m_pLow[m_s])
68  return true;
69  else
70  return false;
71 }
72 
74 {
75  f.write(static_cast<uint64_t>(m_id));
77  f.write(m_s);
78 
79  for (uint32_t i = 0; i < m_r.m_dimension; ++i)
80  {
81  f.write(m_r.m_pLow[i]);
82  f.write(m_r.m_pHigh[i]);
83  }
84 
85  f.write(m_len);
86  if (m_len > 0) f.write(m_len, m_pData);
87 }
88 
90 {
91  m_id = static_cast<id_type>(f.readUInt64());
92  uint32_t dim = f.readUInt32();
93  m_s = f.readUInt32();
94 
95  if (dim != m_r.m_dimension)
96  {
97  delete[] m_r.m_pLow;
98  delete[] m_r.m_pHigh;
99  m_r.m_dimension = dim;
100  m_r.m_pLow = new double[dim];
101  m_r.m_pHigh = new double[dim];
102  }
103 
104  for (uint32_t i = 0; i < m_r.m_dimension; ++i)
105  {
106  m_r.m_pLow[i] = f.readDouble();
107  m_r.m_pHigh[i] = f.readDouble();
108  }
109 
110  m_len = f.readUInt32();
111  delete[] m_pData; m_pData = nullptr;
112  if (m_len > 0) f.readBytes(m_len, &m_pData);
113 }
114 
115 //
116 // ExternalSorter
117 //
118 ExternalSorter::ExternalSorter(uint32_t u32PageSize, uint32_t u32BufferPages)
119 : m_bInsertionPhase(true), m_u32PageSize(u32PageSize),
120  m_u32BufferPages(u32BufferPages), m_u64TotalEntries(0), m_stI(0)
121 {
122 }
123 
125 {
126  for (m_stI = 0; m_stI < m_buffer.size(); ++m_stI) delete m_buffer[m_stI];
127 }
128 
130 {
131  if (m_bInsertionPhase == false)
132  throw Tools::IllegalStateException("ExternalSorter::insert: Input has already been sorted.");
133 
134  m_buffer.push_back(r);
135  ++m_u64TotalEntries;
136 
137  // this will create the initial, sorted buckets before the
138  // external merge sort.
139  if (m_buffer.size() >= m_u32PageSize * m_u32BufferPages)
140  {
141  std::sort(m_buffer.begin(), m_buffer.end(), Record::SortAscending());
143  for (size_t j = 0; j < m_buffer.size(); ++j)
144  {
145  m_buffer[j]->storeToFile(*tf);
146  delete m_buffer[j];
147  }
148  m_buffer.clear();
149  tf->rewindForReading();
150  m_runs.push_back(std::shared_ptr<Tools::TemporaryFile>(tf));
151  }
152 }
153 
155 {
156  if (m_bInsertionPhase == false)
157  throw Tools::IllegalStateException("ExternalSorter::sort: Input has already been sorted.");
158 
159  if (m_runs.empty())
160  {
161  // The data fits in main memory. No need to store to disk.
162  std::sort(m_buffer.begin(), m_buffer.end(), Record::SortAscending());
163  m_bInsertionPhase = false;
164  return;
165  }
166 
167  if (m_buffer.size() > 0)
168  {
169  // Whatever remained in the buffer (if not filled) needs to be stored
170  // as the final bucket.
171  std::sort(m_buffer.begin(), m_buffer.end(), Record::SortAscending());
173  for (size_t j = 0; j < m_buffer.size(); ++j)
174  {
175  m_buffer[j]->storeToFile(*tf);
176  delete m_buffer[j];
177  }
178  m_buffer.clear();
179  tf->rewindForReading();
180  m_runs.push_back(std::shared_ptr<Tools::TemporaryFile>(tf));
181  }
182 
183  if (m_runs.size() == 1)
184  {
185  m_sortedFile = m_runs.front();
186  }
187  else
188  {
189  Record* r = nullptr;
190 
191  while (m_runs.size() > 1)
192  {
193  std::shared_ptr<Tools::TemporaryFile> tf(new Tools::TemporaryFile());
194  std::vector<std::shared_ptr<Tools::TemporaryFile> > buckets;
195  std::vector<std::queue<Record*> > buffers;
196  std::priority_queue<PQEntry, std::vector<PQEntry>, PQEntry::SortAscending> pq;
197 
198  // initialize buffers and priority queue.
199  std::list<std::shared_ptr<Tools::TemporaryFile> >::iterator it = m_runs.begin();
200  for (uint32_t i = 0; i < (std::min)(static_cast<uint32_t>(m_runs.size()), m_u32BufferPages); ++i)
201  {
202  buckets.push_back(*it);
203  buffers.emplace_back();
204 
205  r = new Record();
206  r->loadFromFile(**it);
207  // a run cannot be empty initially, so this should never fail.
208  pq.push(PQEntry(r, i));
209 
210  for (uint32_t j = 0; j < m_u32PageSize - 1; ++j)
211  {
212  // fill the buffer with the rest of the page of records.
213  try
214  {
215  r = new Record();
216  r->loadFromFile(**it);
217  buffers.back().push(r);
218  }
220  {
221  delete r;
222  break;
223  }
224  }
225  ++it;
226  }
227 
228  // exhaust buckets, buffers, and priority queue.
229  while (! pq.empty())
230  {
231  PQEntry e = pq.top(); pq.pop();
232  e.m_r->storeToFile(*tf);
233  delete e.m_r;
234 
235  if (! buckets[e.m_u32Index]->eof() && buffers[e.m_u32Index].empty())
236  {
237  for (uint32_t j = 0; j < m_u32PageSize; ++j)
238  {
239  try
240  {
241  r = new Record();
242  r->loadFromFile(*buckets[e.m_u32Index]);
243  buffers[e.m_u32Index].push(r);
244  }
246  {
247  delete r;
248  break;
249  }
250  }
251  }
252 
253  if (! buffers[e.m_u32Index].empty())
254  {
255  e.m_r = buffers[e.m_u32Index].front();
256  buffers[e.m_u32Index].pop();
257  pq.push(e);
258  }
259  }
260 
261  tf->rewindForReading();
262 
263  // check if another pass is needed.
264  uint32_t u32Count = std::min(static_cast<uint32_t>(m_runs.size()), m_u32BufferPages);
265  for (uint32_t i = 0; i < u32Count; ++i)
266  {
267  m_runs.pop_front();
268  }
269 
270  if (m_runs.size() == 0)
271  {
272  m_sortedFile = tf;
273  break;
274  }
275  else
276  {
277  m_runs.push_back(tf);
278  }
279  }
280  }
281 
282  m_bInsertionPhase = false;
283 }
284 
286 {
287  if (m_bInsertionPhase == true)
288  throw Tools::IllegalStateException("ExternalSorter::getNextRecord: Input has not been sorted yet.");
289 
290  Record* ret;
291 
292  if (m_sortedFile.get() == nullptr)
293  {
294  if (m_stI < m_buffer.size())
295  {
296  ret = m_buffer[m_stI];
297  m_buffer[m_stI] = nullptr;
298  ++m_stI;
299  }
300  else
301  throw Tools::EndOfStreamException("");
302  }
303  else
304  {
305  ret = new Record();
306  ret->loadFromFile(*m_sortedFile);
307  }
308 
309  return ret;
310 }
311 
312 inline uint64_t ExternalSorter::getTotalEntries() const
313 {
314  return m_u64TotalEntries;
315 }
316 
317 //
318 // BulkLoader
319 //
322  IDataStream& stream,
323  uint32_t bindex,
324  uint32_t bleaf,
325  uint32_t pageSize,
326  uint32_t numberOfPages
327 ) {
328  if (! stream.hasNext())
330  "RTree::BulkLoader::bulkLoadUsingSTR: Empty data stream given."
331  );
332 
333  NodePtr n = pTree->readNode(pTree->m_rootID);
334  pTree->deleteNode(n.get());
335 
336  #ifndef NDEBUG
337  std::cerr << "RTree::BulkLoader: Sorting data." << std::endl;
338  #endif
339 
340  std::shared_ptr<ExternalSorter> es = std::shared_ptr<ExternalSorter>(new ExternalSorter(pageSize, numberOfPages));
341 
342  while (stream.hasNext())
343  {
344  Data* d = reinterpret_cast<Data*>(stream.getNext());
345  if (d == nullptr)
347  "bulkLoadUsingSTR: RTree bulk load expects SpatialIndex::RTree::Data entries."
348  );
349 
350  es->insert(new ExternalSorter::Record(d->m_region, d->m_id, d->m_dataLength, d->m_pData, 0));
351  d->m_pData = nullptr;
352  delete d;
353  }
354  es->sort();
355 
356  pTree->m_stats.m_u64Data = es->getTotalEntries();
357 
358  // create index levels.
359  uint32_t level = 0;
360 
361  while (true)
362  {
363  #ifndef NDEBUG
364  std::cerr << "RTree::BulkLoader: Building level " << level << std::endl;
365  #endif
366 
367  pTree->m_stats.m_nodesInLevel.push_back(0);
368 
369  std::shared_ptr<ExternalSorter> es2 = std::shared_ptr<ExternalSorter>(new ExternalSorter(pageSize, numberOfPages));
370  createLevel(pTree, es, 0, bleaf, bindex, level++, es2, pageSize, numberOfPages);
371  es = es2;
372 
373  if (es->getTotalEntries() == 1) break;
374  es->sort();
375  }
376 
377  pTree->m_stats.m_u32TreeHeight = level;
378  pTree->storeHeader();
379 }
380 
383  std::shared_ptr<ExternalSorter> es,
384  uint32_t dimension,
385  uint32_t bleaf,
386  uint32_t bindex,
387  uint32_t level,
388  std::shared_ptr<ExternalSorter> es2,
389  uint32_t pageSize,
390  uint32_t numberOfPages
391 ) {
392  uint64_t b = (level == 0) ? bleaf : bindex;
393  uint64_t P = static_cast<uint64_t>(std::ceil(static_cast<double>(es->getTotalEntries()) / static_cast<double>(b)));
394  uint64_t S = static_cast<uint64_t>(std::ceil(std::sqrt(static_cast<double>(P))));
395 
396  if (S == 1 || dimension == pTree->m_dimension - 1 || S * b == es->getTotalEntries())
397  {
398  std::vector<ExternalSorter::Record*> node;
400 
401  while (true)
402  {
403  try { r = es->getNextRecord(); } catch (Tools::EndOfStreamException&) { break; }
404  node.push_back(r);
405 
406  if (node.size() == b)
407  {
408  Node* n = createNode(pTree, node, level);
409  node.clear();
410  pTree->writeNode(n);
411  es2->insert(new ExternalSorter::Record(n->m_nodeMBR, n->m_identifier, 0, nullptr, 0));
412  pTree->m_rootID = n->m_identifier;
413  // special case when the root has exactly bindex entries.
414  delete n;
415  }
416  }
417 
418  if (! node.empty())
419  {
420  Node* n = createNode(pTree, node, level);
421  pTree->writeNode(n);
422  es2->insert(new ExternalSorter::Record(n->m_nodeMBR, n->m_identifier, 0, nullptr, 0));
423  pTree->m_rootID = n->m_identifier;
424  delete n;
425  }
426  }
427  else
428  {
429  bool bMore = true;
430 
431  while (bMore)
432  {
434  std::shared_ptr<ExternalSorter> es3 = std::shared_ptr<ExternalSorter>(new ExternalSorter(pageSize, numberOfPages));
435 
436  for (uint64_t i = 0; i < S * b; ++i)
437  {
438  try { pR = es->getNextRecord(); }
439  catch (Tools::EndOfStreamException&) { bMore = false; break; }
440  pR->m_s = dimension + 1;
441  es3->insert(pR);
442  }
443  es3->sort();
444  createLevel(pTree, es3, dimension + 1, bleaf, bindex, level, es2, pageSize, numberOfPages);
445  }
446  }
447 }
448 
449 Node* BulkLoader::createNode(SpatialIndex::RTree::RTree* pTree, std::vector<ExternalSorter::Record*>& e, uint32_t level)
450 {
451  Node* n;
452 
453  if (level == 0) n = new Leaf(pTree, -1);
454  else n = new Index(pTree, -1, level);
455 
456  for (size_t cChild = 0; cChild < e.size(); ++cChild)
457  {
458  n->insertEntry(e[cChild]->m_len, e[cChild]->m_pData, e[cChild]->m_r, e[cChild]->m_id);
459  e[cChild]->m_pData = nullptr;
460  delete e[cChild];
461  }
462 
463  return n;
464 }
void bulkLoadUsingSTR(RTree *pTree, IDataStream &stream, uint32_t bindex, uint32_t bleaf, uint32_t pageSize, uint32_t numberOfPages)
Definition: BulkLoader.cc:320
virtual bool hasNext()=0
double readDouble()
Definition: Tools.cc:1199
bool operator<(const Record &r) const
Definition: BulkLoader.cc:62
double * m_pLow
Definition: Region.h:98
Node * createNode(RTree *pTree, std::vector< ExternalSorter::Record *> &e, uint32_t level)
Definition: BulkLoader.cc:449
void storeToFile(Tools::TemporaryFile &f)
Definition: BulkLoader.cc:73
uint32_t m_dimension
Definition: Region.h:97
double * m_pHigh
Definition: Region.h:99
X * get() const noexcept
Definition: PoolPointer.h:55
void readBytes(uint32_t u32Len, uint8_t **pData)
Definition: Tools.cc:1217
ExternalSorter(uint32_t u32PageSize, uint32_t u32BufferPages)
Definition: BulkLoader.cc:118
uint64_t readUInt64()
Definition: Tools.cc:1181
IData * getNext() override=0
uint32_t readUInt32()
Definition: Tools.cc:1172
int64_t id_type
Definition: SpatialIndex.h:41
void createLevel(RTree *pTree, std::shared_ptr< ExternalSorter > es, uint32_t dimension, uint32_t indexSize, uint32_t leafSize, uint32_t level, std::shared_ptr< ExternalSorter > es2, uint32_t pageSize, uint32_t numberOfPages)
Definition: BulkLoader.cc:381
void loadFromFile(Tools::TemporaryFile &f)
Definition: BulkLoader.cc:89
void write(uint8_t i)
Definition: Tools.cc:1226