GENIE
heap_count.cu
Go to the documentation of this file.
1 
5 #include <stdio.h>
6 #include <assert.h>
7 #include <stdlib.h>
8 #include <thrust/copy.h>
9 #include <thrust/device_vector.h>
10 #include <thrust/host_vector.h>
11 #include <thrust/count.h>
12 #include <genie/utility/Timing.h>
13 #include <genie/matching/match.h>
16 
17 #define THREADS_PER_BLOCK 256
18 #define GPUGenie_Minus_One_THREADS_PER_BLOCK 1024
19 
20 using namespace std;
21 using namespace thrust;
22 using namespace genie::matching;
23 
24 __global__
25 void count_over_threshold(data_t * data, int * result, u32 * thresholds,
26  int data_size)
27 {
28  int tid = threadIdx.x;
29  int index = threadIdx.x + blockDim.x * blockIdx.x;
30  data_t * my_data = data + data_size * blockIdx.x;
31  u32 my_threshold = thresholds[blockIdx.x];
32  for (int i = 0; i * blockDim.x + tid < data_size; ++i)
33  {
34  int id = i * blockDim.x + tid;
35  if (my_data[id].aggregation > my_threshold)
36  {
37  result[index]++;
38  }
39  }
40 }
41 
42 __global__
43 void exclusive_scan(int * data, int * buffer, int * output, int size)
44 {
45  int tid = threadIdx.x;
46  int * my_buffer = buffer + 2 * blockIdx.x * size;
47  int * my_data = data + blockIdx.x * size;
48  int * my_output = output + blockIdx.x * (size + 1);
49  my_buffer[size + tid] = my_buffer[tid] = tid == 0 ? 0 : my_data[tid - 1];
50 
51  __syncthreads();
52 
53  for (int offset = 1; offset < size; offset *= 2)
54  {
55  if (tid >= offset)
56  my_buffer[tid] += my_buffer[size + tid - offset];
57  __syncthreads();
58  my_buffer[size + tid] = my_buffer[tid];
59  __syncthreads();
60  }
61  my_output[tid] = my_buffer[tid]; // write output
62  if (tid == size - 1)
63  {
64  my_output[tid + 1] = my_output[tid] + my_data[tid];
65  }
66 
67 }
68 
69 __global__
70 void fill_in_scan(data_t * data, u32 * thresholds, int * indices, data_t * topk,
71  int data_size, int topk_size)
72 {
73  int tid = threadIdx.x;
74  data_t * my_data = data + data_size * blockIdx.x;
75  u32 my_threshold = thresholds[blockIdx.x];
76  data_t * my_topk = topk + blockIdx.x * topk_size
77  + indices[blockIdx.x * (blockDim.x + 1) + tid];
78  for (int i = 0; i * blockDim.x + tid < data_size; ++i)
79  {
80  int id = i * blockDim.x + tid;
81  if (my_data[id].aggregation > my_threshold)
82  {
83  *my_topk = my_data[id];
84  my_topk++;
85  }
86  }
87  __syncthreads();
88 
89  __shared__ int index[1];
90  my_topk = topk + blockIdx.x * topk_size;
91  if (tid == 0)
92  *index = indices[blockIdx.x * (blockDim.x + 1) + blockDim.x];
93  __syncthreads();
94 
95  if (*index >= topk_size)
96  return;
97  for (int i = 0; i * blockDim.x + tid < data_size; ++i)
98  {
99  if (*index >= topk_size)
100  return;
101  int id = i * blockDim.x + tid;
102  if (int(my_data[id].aggregation) == my_threshold)
103  {
104  int old_index = atomicAdd(index, 1);
105  if (old_index >= topk_size)
106  {
107  return;
108  }
109  else
110  {
111  my_topk[old_index] = my_data[id];
112  }
113  }
114  }
115 }
116 
117 __global__
118 void transform_threshold(u32 * thresholds, int size, int max_count)
119 {
120  int tId = threadIdx.x + blockIdx.x * blockDim.x;
121  if (tId >= size)
122  return;
123  if(thresholds[tId] > max_count){
124  thresholds[tId] = max_count - 1;
125  } else if (thresholds[tId] != 0)
126  {
127  thresholds[tId]--;
128  }
129 }
130 
131 void write_hashtable_to_file(thrust::device_vector<data_t>& d_data, int num_of_queries){
132 
133  int part_size = d_data.size() / num_of_queries;
134  thrust::host_vector<data_t> h_data(d_data);
135  FILE * fout = NULL;
136  FILE * fout_compact = NULL;
137  std::string s = genie::utility::currentDateTime();
138  char fout_name[100];
139  char fout_compact_name[100];
140  sprintf(fout_name, "%s.txt", s.c_str());
141  sprintf(fout_compact_name, "%s-compact.txt", s.c_str());
142  fout = fopen(fout_name, "w");
143  fout_compact = fopen(fout_compact_name, "w");
144  for(int qi = 0; qi < num_of_queries; ++qi){
145  fprintf(fout, "Query %d:\n", qi);
146  fprintf(fout_compact, "Query %d:\n", qi);
147  for(int di = 0; di < part_size; ++di){
148  int id = qi * part_size + di;
149  if(h_data[id].aggregation != 0.0f || h_data[id].id != 0){
150  fprintf(fout_compact, "[%d] %d\n", h_data[id].id, int(h_data[id].aggregation));
151  }
152  fprintf(fout, "[%d] %d\n", h_data[id].id, int(h_data[id].aggregation));
153  }
154  fprintf(fout, "\n");
155  fprintf(fout_compact, "\n");
156  }
157  fclose(fout);
158  fclose(fout_compact);
159 }
160 
161 void genie::matching::heap_count_topk(thrust::device_vector<data_t>& d_data,
162  thrust::device_vector<data_t>& d_topk,
163  thrust::device_vector<u32>& d_threshold,
164  thrust::device_vector<u32>& d_passCount, int topk, int num_of_queries)
165 {
166  int data_size = d_data.size() / num_of_queries;
167  int threads =
168  data_size >= THREADS_PER_BLOCK ? THREADS_PER_BLOCK : data_size;
169  int max_count = d_passCount.size() / num_of_queries;
170  int * d_num_over_threshold_p;
171  u32 * d_threshold_p;
172  data_t * d_data_p;
173 
175  cudaMalloc((void** ) &d_num_over_threshold_p,
176  sizeof(int) * threads * num_of_queries));
178  cudaMemset((void* ) d_num_over_threshold_p, 0,
179  sizeof(int) * threads * num_of_queries));
180 
181  //DEBUG
182 
183  //write_hashtable_to_file(d_data, num_of_queries);
184 
185 // thrust::host_vector<u32> h_threshold_b(d_threshold);
186 // printf("Thresholds before minus one transforms:\n");
187 // for(int i = 0; i < h_threshold_b.size(); ++i){
188 // printf("%d ", h_threshold_b[i]);
189 // }
190 // printf("\n");
191  //End DEBUG
192 
193  d_threshold_p = thrust::raw_pointer_cast(d_threshold.data());
195  GPUGenie_Minus_One_THREADS_PER_BLOCK>>>(d_threshold_p,
196  d_threshold.size(), max_count);
197  //DEBUG
198 // thrust::host_vector<u32> h_threshold_af(d_threshold);
199 // printf("Thresholds after minus one transforms:\n");
200 // for(int i = 0; i < h_threshold_af.size(); ++i){
201 // printf("%d ", h_threshold_af[i]);
202 // }
203 // printf("\n");
204  //End DEBUG
205 
206  d_data_p = thrust::raw_pointer_cast(d_data.data());
207 
208  count_over_threshold<<<num_of_queries, threads>>>(d_data_p,
209  d_num_over_threshold_p, d_threshold_p, data_size);
210 
211  //Debugging
212 // int *h_result;
213 // h_result = (int*) malloc(sizeof(int) * threads * num_of_queries);
214 // cudaMemcpy((void*) h_result, d_num_over_threshold_p,
215 // sizeof(int) * threads * num_of_queries, cudaMemcpyDeviceToHost);
216 
217 // for (int i = 0; i < num_of_queries; ++i)
218 // {
219 // for (int j = 0; j < threads && j < 15; ++j)
220 // {
221 // printf("%d ", h_result[i * threads + j]);
222 // }
223 // printf("\n");
224 // }
225 // printf("-----------------------------------\n");
226 
227  int * d_buffer, *d_scan_indices;
228  cudaCheckErrors(cudaMalloc((void**) &d_buffer, 2 * sizeof(int) * threads * num_of_queries));
229  cudaCheckErrors(cudaMemset((void*) d_buffer, 0, 2 * sizeof(int) * threads * num_of_queries));
230  cudaCheckErrors(cudaMalloc((void**) &d_scan_indices,
231  sizeof(int) * (threads + 1) * num_of_queries));
232 
233  exclusive_scan<<<num_of_queries, threads>>>(d_num_over_threshold_p,
234  d_buffer, d_scan_indices, threads);
235 
236  //Debugging
237 // free(h_result);
238 // h_result = (int*) malloc(sizeof(int) * (threads + 1) * num_of_queries);
239 // cudaMemcpy((void*) h_result, d_scan_indices,
240 // sizeof(int) * (threads + 1) * num_of_queries,
241 // cudaMemcpyDeviceToHost);
242 
243 // for (int i = 0; i < num_of_queries; ++i)
244 // {
245 // for (int j = 0; j < threads + 1 && j < 16; ++j)
246 // {
247 // printf("%d ", h_result[i * (threads + 1) + j]);
248 // }
249 // printf("\n");
250 // }
251 // printf("-----------------------------------\n");
252 
253  d_topk.resize(topk * num_of_queries);
254  data_t empty_data ={ 0, 0 };
255  thrust::fill(d_topk.begin(), d_topk.end(), empty_data);
256  data_t * d_topk_p = thrust::raw_pointer_cast(d_topk.data());
257 
258  fill_in_scan<<<num_of_queries, threads, 2 * sizeof(int)>>>(d_data_p,
259  d_threshold_p, d_scan_indices, d_topk_p, data_size, topk);
260 
261 // data_t * h_topk_result;
262 // h_topk_result = (data_t*) malloc(sizeof(data_t) * topk * num_of_queries);
263 // cudaMemcpy((void*) h_topk_result, d_topk_p,
264 // sizeof(data_t) * topk * num_of_queries, cudaMemcpyDeviceToHost);
265 
266 // for (int i = 0; i < num_of_queries; ++i)
267 // {
268 // for (int j = 0; j < topk && j < 10; ++j)
269 // {
270 // printf("%d ", int(h_topk_result[i * topk + j].aggregation));
271 // }
272 // printf("\n");
273 // }
274 // printf("-----------------------------------\n");
275 // printf("Number of threads per block launched: %d.\n", threads);
276 
277  cudaCheckErrors(cudaFree(d_num_over_threshold_p));
278  cudaCheckErrors(cudaFree(d_buffer));
279  cudaCheckErrors(cudaFree(d_scan_indices));
280  //free(h_result);
281  //free(h_topk_result);
282 }
283 
__global__ void transform_threshold(u32 *thresholds, int size, int max_count)
Definition: heap_count.cu:118
__global__ void exclusive_scan(int *data, int *buffer, int *output, int size)
Definition: heap_count.cu:43
#define GPUGenie_Minus_One_THREADS_PER_BLOCK
Definition: heap_count.cu:18
std::string currentDateTime()
Get current data time.
Definition: Timing.cc:10
This file includes interfaces of original GENIE match functions.
__global__ void fill_in_scan(data_t *data, u32 *thresholds, int *indices, data_t *topk, int data_size, int topk_size)
Definition: heap_count.cu:70
void heap_count_topk(thrust::device_vector< genie::matching::data_t > &d_data, thrust::device_vector< genie::matching::data_t > &d_topk, thrust::device_vector< u32 > &d_threshold, thrust::device_vector< u32 > &d_passCount, int topk, int num_of_queries)
uint32_t u32
Definition: match_common.h:18
This file implements the function for topk selection in the final hashtable.
void write_hashtable_to_file(thrust::device_vector< data_t > &d_data, int num_of_queries)
Definition: heap_count.cu:131
Functions about getting system time.
__global__ void count_over_threshold(data_t *data, int *result, u32 *thresholds, int data_size)
Definition: heap_count.cu:25
#define cudaCheckErrors(err)
The wrapper function to validate CUDA calls.
Definition: cuda_macros.h:23
#define THREADS_PER_BLOCK
Definition: heap_count.cu:17