libspatialindex API Reference  (git-trunk)
BulkLoader.cc
Go to the documentation of this file.
1 /******************************************************************************
2  * Project: libspatialindex - A C++ library for spatial indexing
3  * Author: Marios Hadjieleftheriou, mhadji@gmail.com
4  ******************************************************************************
5  * Copyright (c) 2002, Marios Hadjieleftheriou
6  *
7  * All rights reserved.
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a
10  * copy of this software and associated documentation files (the "Software"),
11  * to deal in the Software without restriction, including without limitation
12  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
13  * and/or sell copies of the Software, and to permit persons to whom the
14  * Software is furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included
17  * in all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
20  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
22  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25  * DEALINGS IN THE SOFTWARE.
26 ******************************************************************************/
27 
28 #include <cstring>
29 #include <cstdio>
30 #include <cmath>
31 
32 #ifndef _MSC_VER
33 #include <unistd.h>
34 #endif
35 
37 
38 #include "RTree.h"
39 #include "Leaf.h"
40 #include "Index.h"
41 #include "BulkLoader.h"
42 
43 using namespace SpatialIndex;
44 using namespace SpatialIndex::RTree;
45 
46 //
47 // ExternalSorter::Record
48 //
50 = default;
51 
52 ExternalSorter::Record::Record(const Region& r, id_type id, uint32_t len, uint8_t* pData, uint32_t s)
53 : m_r(r), m_id(id), m_len(len), m_pData(pData), m_s(s)
54 {
55 }
56 
58 {
59  delete[] m_pData;
60 }
61 
63 {
64  if (m_s != r.m_s)
65  throw Tools::IllegalStateException("ExternalSorter::Record::operator<: Incompatible sorting dimensions.");
66 
67  if (m_r.m_pHigh[m_s] + m_r.m_pLow[m_s] < r.m_r.m_pHigh[m_s] + r.m_r.m_pLow[m_s])
68  return true;
69  else
70  return false;
71 }
72 
74 {
75  f.write(static_cast<uint64_t>(m_id));
76  f.write(m_r.m_dimension);
77  f.write(m_s);
78 
79  for (uint32_t i = 0; i < m_r.m_dimension; ++i)
80  {
81  f.write(m_r.m_pLow[i]);
82  f.write(m_r.m_pHigh[i]);
83  }
84 
85  f.write(m_len);
86  if (m_len > 0) f.write(m_len, m_pData);
87 }
88 
90 {
91  m_id = static_cast<id_type>(f.readUInt64());
92  uint32_t dim = f.readUInt32();
93  m_s = f.readUInt32();
94  m_r.makeDimension(dim);
95 
96  for (uint32_t i = 0; i < m_r.m_dimension; ++i)
97  {
98  m_r.m_pLow[i] = f.readDouble();
99  m_r.m_pHigh[i] = f.readDouble();
100  }
101 
102  m_len = f.readUInt32();
103  delete[] m_pData; m_pData = nullptr;
104  if (m_len > 0) f.readBytes(m_len, &m_pData);
105 }
106 
107 //
108 // ExternalSorter
109 //
110 ExternalSorter::ExternalSorter(uint32_t u32PageSize, uint32_t u32BufferPages)
111 : m_bInsertionPhase(true), m_u32PageSize(u32PageSize),
112  m_u32BufferPages(u32BufferPages), m_u64TotalEntries(0), m_stI(0)
113 {
114 }
115 
117 {
118  for (m_stI = 0; m_stI < m_buffer.size(); ++m_stI) delete m_buffer[m_stI];
119 }
120 
122 {
123  if (m_bInsertionPhase == false)
124  throw Tools::IllegalStateException("ExternalSorter::insert: Input has already been sorted.");
125 
126  m_buffer.push_back(r);
127  ++m_u64TotalEntries;
128 
129  // this will create the initial, sorted buckets before the
130  // external merge sort.
131  if (m_buffer.size() >= m_u32PageSize * m_u32BufferPages)
132  {
133  std::sort(m_buffer.begin(), m_buffer.end(), Record::SortAscending());
135  for (size_t j = 0; j < m_buffer.size(); ++j)
136  {
137  m_buffer[j]->storeToFile(*tf);
138  delete m_buffer[j];
139  }
140  m_buffer.clear();
141  tf->rewindForReading();
142  m_runs.push_back(std::shared_ptr<Tools::TemporaryFile>(tf));
143  }
144 }
145 
147 {
148  if (m_bInsertionPhase == false)
149  throw Tools::IllegalStateException("ExternalSorter::sort: Input has already been sorted.");
150 
151  if (m_runs.empty())
152  {
153  // The data fits in main memory. No need to store to disk.
154  std::sort(m_buffer.begin(), m_buffer.end(), Record::SortAscending());
155  m_bInsertionPhase = false;
156  return;
157  }
158 
159  if (m_buffer.size() > 0)
160  {
161  // Whatever remained in the buffer (if not filled) needs to be stored
162  // as the final bucket.
163  std::sort(m_buffer.begin(), m_buffer.end(), Record::SortAscending());
165  for (size_t j = 0; j < m_buffer.size(); ++j)
166  {
167  m_buffer[j]->storeToFile(*tf);
168  delete m_buffer[j];
169  }
170  m_buffer.clear();
171  tf->rewindForReading();
172  m_runs.push_back(std::shared_ptr<Tools::TemporaryFile>(tf));
173  }
174 
175  if (m_runs.size() == 1)
176  {
177  m_sortedFile = m_runs.front();
178  }
179  else
180  {
181  Record* r = nullptr;
182 
183  while (m_runs.size() > 1)
184  {
185  std::shared_ptr<Tools::TemporaryFile> tf(new Tools::TemporaryFile());
186  std::vector<std::shared_ptr<Tools::TemporaryFile> > buckets;
187  std::vector<std::queue<Record*> > buffers;
188  std::priority_queue<PQEntry, std::vector<PQEntry>, PQEntry::SortAscending> pq;
189 
190  // initialize buffers and priority queue.
191  std::list<std::shared_ptr<Tools::TemporaryFile> >::iterator it = m_runs.begin();
192  for (uint32_t i = 0; i < (std::min)(static_cast<uint32_t>(m_runs.size()), m_u32BufferPages); ++i)
193  {
194  buckets.push_back(*it);
195  buffers.emplace_back();
196 
197  r = new Record();
198  r->loadFromFile(**it);
199  // a run cannot be empty initially, so this should never fail.
200  pq.push(PQEntry(r, i));
201 
202  for (uint32_t j = 0; j < m_u32PageSize - 1; ++j)
203  {
204  // fill the buffer with the rest of the page of records.
205  try
206  {
207  r = new Record();
208  r->loadFromFile(**it);
209  buffers.back().push(r);
210  }
212  {
213  delete r;
214  break;
215  }
216  }
217  ++it;
218  }
219 
220  // exhaust buckets, buffers, and priority queue.
221  while (! pq.empty())
222  {
223  PQEntry e = pq.top(); pq.pop();
224  e.m_r->storeToFile(*tf);
225  delete e.m_r;
226 
227  if (! buckets[e.m_u32Index]->eof() && buffers[e.m_u32Index].empty())
228  {
229  for (uint32_t j = 0; j < m_u32PageSize; ++j)
230  {
231  try
232  {
233  r = new Record();
234  r->loadFromFile(*buckets[e.m_u32Index]);
235  buffers[e.m_u32Index].push(r);
236  }
238  {
239  delete r;
240  break;
241  }
242  }
243  }
244 
245  if (! buffers[e.m_u32Index].empty())
246  {
247  e.m_r = buffers[e.m_u32Index].front();
248  buffers[e.m_u32Index].pop();
249  pq.push(e);
250  }
251  }
252 
253  tf->rewindForReading();
254 
255  // check if another pass is needed.
256  uint32_t u32Count = std::min(static_cast<uint32_t>(m_runs.size()), m_u32BufferPages);
257  for (uint32_t i = 0; i < u32Count; ++i)
258  {
259  m_runs.pop_front();
260  }
261 
262  if (m_runs.size() == 0)
263  {
264  m_sortedFile = tf;
265  break;
266  }
267  else
268  {
269  m_runs.push_back(tf);
270  }
271  }
272  }
273 
274  m_bInsertionPhase = false;
275 }
276 
278 {
279  if (m_bInsertionPhase == true)
280  throw Tools::IllegalStateException("ExternalSorter::getNextRecord: Input has not been sorted yet.");
281 
282  Record* ret;
283 
284  if (m_sortedFile.get() == nullptr)
285  {
286  if (m_stI < m_buffer.size())
287  {
288  ret = m_buffer[m_stI];
289  m_buffer[m_stI] = nullptr;
290  ++m_stI;
291  }
292  else
293  throw Tools::EndOfStreamException("");
294  }
295  else
296  {
297  ret = new Record();
298  ret->loadFromFile(*m_sortedFile);
299  }
300 
301  return ret;
302 }
303 
304 inline uint64_t ExternalSorter::getTotalEntries() const
305 {
306  return m_u64TotalEntries;
307 }
308 
309 //
310 // BulkLoader
311 //
314  IDataStream& stream,
315  uint32_t bindex,
316  uint32_t bleaf,
317  uint32_t pageSize,
318  uint32_t numberOfPages
319 ) {
320  if (! stream.hasNext())
322  "RTree::BulkLoader::bulkLoadUsingSTR: Empty data stream given."
323  );
324 
325  NodePtr n = pTree->readNode(pTree->m_rootID);
326  pTree->deleteNode(n.get());
327 
328  std::shared_ptr<ExternalSorter> es = std::shared_ptr<ExternalSorter>(new ExternalSorter(pageSize, numberOfPages));
329 
330  while (stream.hasNext())
331  {
332  Data* d = reinterpret_cast<Data*>(stream.getNext());
333  if (d == nullptr)
335  "bulkLoadUsingSTR: RTree bulk load expects SpatialIndex::RTree::Data entries."
336  );
337 
338  es->insert(new ExternalSorter::Record(d->m_region, d->m_id, d->m_dataLength, d->m_pData, 0));
339  d->m_pData = nullptr;
340  delete d;
341  }
342  es->sort();
343 
344  pTree->m_stats.m_u64Data = es->getTotalEntries();
345 
346  // create index levels.
347  uint32_t level = 0;
348 
349  while (true)
350  {
351  pTree->m_stats.m_nodesInLevel.push_back(0);
352 
353  std::shared_ptr<ExternalSorter> es2 = std::shared_ptr<ExternalSorter>(new ExternalSorter(pageSize, numberOfPages));
354  createLevel(pTree, es, 0, bleaf, bindex, level++, es2, pageSize, numberOfPages);
355  es = es2;
356 
357  if (es->getTotalEntries() == 1) break;
358  es->sort();
359  }
360 
361  pTree->m_stats.m_u32TreeHeight = level;
362  pTree->storeHeader();
363 }
364 
367  std::shared_ptr<ExternalSorter> es,
368  uint32_t dimension,
369  uint32_t bleaf,
370  uint32_t bindex,
371  uint32_t level,
372  std::shared_ptr<ExternalSorter> es2,
373  uint32_t pageSize,
374  uint32_t numberOfPages
375 ) {
376  uint64_t b = (level == 0) ? bleaf : bindex;
377  uint64_t P = static_cast<uint64_t>(std::ceil(static_cast<double>(es->getTotalEntries()) / static_cast<double>(b)));
378  uint64_t S = static_cast<uint64_t>(std::ceil(std::sqrt(static_cast<double>(P))));
379 
380  if (S == 1 || dimension == pTree->m_dimension - 1 || S * b == es->getTotalEntries())
381  {
382  std::vector<ExternalSorter::Record*> node;
384 
385  while (true)
386  {
387  try { r = es->getNextRecord(); } catch (Tools::EndOfStreamException&) { break; }
388  node.push_back(r);
389 
390  if (node.size() == b)
391  {
392  Node* n = createNode(pTree, node, level);
393  node.clear();
394  pTree->writeNode(n);
395  es2->insert(new ExternalSorter::Record(n->m_nodeMBR, n->m_identifier, 0, nullptr, 0));
396  pTree->m_rootID = n->m_identifier;
397  // special case when the root has exactly bindex entries.
398  delete n;
399  }
400  }
401 
402  if (! node.empty())
403  {
404  Node* n = createNode(pTree, node, level);
405  pTree->writeNode(n);
406  es2->insert(new ExternalSorter::Record(n->m_nodeMBR, n->m_identifier, 0, nullptr, 0));
407  pTree->m_rootID = n->m_identifier;
408  delete n;
409  }
410  }
411  else
412  {
413  bool bMore = true;
414 
415  while (bMore)
416  {
418  std::shared_ptr<ExternalSorter> es3 = std::shared_ptr<ExternalSorter>(new ExternalSorter(pageSize, numberOfPages));
419 
420  for (uint64_t i = 0; i < S * b; ++i)
421  {
422  try { pR = es->getNextRecord(); }
423  catch (Tools::EndOfStreamException&) { bMore = false; break; }
424  pR->m_s = dimension + 1;
425  es3->insert(pR);
426  }
427  es3->sort();
428  createLevel(pTree, es3, dimension + 1, bleaf, bindex, level, es2, pageSize, numberOfPages);
429  }
430  }
431 }
432 
433 Node* BulkLoader::createNode(SpatialIndex::RTree::RTree* pTree, std::vector<ExternalSorter::Record*>& e, uint32_t level)
434 {
435  Node* n;
436 
437  if (level == 0) n = new Leaf(pTree, -1);
438  else n = new Index(pTree, -1, level);
439 
440  for (size_t cChild = 0; cChild < e.size(); ++cChild)
441  {
442  n->insertEntry(e[cChild]->m_len, e[cChild]->m_pData, e[cChild]->m_r, e[cChild]->m_id);
443  e[cChild]->m_pData = nullptr;
444  delete e[cChild];
445  }
446 
447  return n;
448 }
IData * getNext() override=0
void createLevel(RTree *pTree, std::shared_ptr< ExternalSorter > es, uint32_t dimension, uint32_t indexSize, uint32_t leafSize, uint32_t level, std::shared_ptr< ExternalSorter > es2, uint32_t pageSize, uint32_t numberOfPages)
Definition: BulkLoader.cc:365
void bulkLoadUsingSTR(RTree *pTree, IDataStream &stream, uint32_t bindex, uint32_t bleaf, uint32_t pageSize, uint32_t numberOfPages)
Definition: BulkLoader.cc:312
Node * createNode(RTree *pTree, std::vector< ExternalSorter::Record * > &e, uint32_t level)
Definition: BulkLoader.cc:433
bool operator<(const Record &r) const
Definition: BulkLoader.cc:62
void loadFromFile(Tools::TemporaryFile &f)
Definition: BulkLoader.cc:89
void storeToFile(Tools::TemporaryFile &f)
Definition: BulkLoader.cc:73
ExternalSorter(uint32_t u32PageSize, uint32_t u32BufferPages)
Definition: BulkLoader.cc:110
double * m_pHigh
Definition: Region.h:101
double * m_pLow
Definition: Region.h:100
virtual bool hasNext()=0
X * get() const noexcept
Definition: PoolPointer.h:55
uint64_t readUInt64()
Definition: Tools.cc:1181
void readBytes(uint32_t u32Len, uint8_t **pData)
Definition: Tools.cc:1217
void write(uint8_t i)
Definition: Tools.cc:1226
void rewindForReading()
Definition: Tools.cc:1120
double readDouble()
Definition: Tools.cc:1199
uint32_t readUInt32()
Definition: Tools.cc:1172
int64_t id_type
Definition: SpatialIndex.h:41