Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.

Commit 230e995

Browse files
committed
Initial commit for exposing collection re-index to non sysadmin users.
1 parent 4a65910 commit 230e995

File tree

4 files changed

+199
-58
lines changed

4 files changed

+199
-58
lines changed

stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.apache.usergrid.corepersistence.index;
2121

2222

23+
import org.apache.usergrid.utils.StringUtils;
24+
2325
/**
2426
* An interface for re-indexing all entities in an application
2527
*/
@@ -47,6 +49,13 @@ public interface ReIndexService {
4749
*/
4850
ReIndexStatus getStatus( final String jobId );
4951

52+
/**
53+
* Get the status of a collection job
54+
* @param collectionName The collectionName for the rebuild index
55+
* @return
56+
*/
57+
ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName );
58+
5059

5160
/**
5261
* The response when requesting a re-index operation
@@ -56,14 +65,27 @@ public class ReIndexStatus {
5665
final Status status;
5766
final long numberProcessed;
5867
final long lastUpdated;
68+
final String collectionName;
5969

6070

6171
public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
62-
final long lastUpdated ) {
63-
this.jobId = jobId;
72+
final long lastUpdated, final String collectionName ) {
73+
74+
if(StringUtils.isNotEmpty(jobId)){
75+
this.jobId = jobId;
76+
}else {
77+
this.jobId = "";
78+
}
79+
6480
this.status = status;
6581
this.numberProcessed = numberProcessed;
6682
this.lastUpdated = lastUpdated;
83+
84+
if(StringUtils.isNotEmpty(collectionName)){
85+
this.collectionName = collectionName;
86+
}else {
87+
this.collectionName = "";
88+
}
6789
}
6890

6991

@@ -74,6 +96,13 @@ public String getJobId() {
7496
return jobId;
7597
}
7698

99+
/**
100+
* Get the jobId used to resume this operation
101+
*/
102+
public String getCollectionName() {
103+
return collectionName;
104+
}
105+
77106

78107
/**
79108
* Get the last updated time, as a long

stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java

Lines changed: 71 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class ReIndexServiceImpl implements ReIndexService {
7575
private static final String MAP_COUNT_KEY = "count";
7676
private static final String MAP_STATUS_KEY = "status";
7777
private static final String MAP_UPDATED_KEY = "lastUpdated";
78+
private static final String MAP_SEPARATOR = "|||";
7879

7980

8081
private final AllApplicationsObservable allApplicationsObservable;
@@ -140,7 +141,9 @@ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBui
140141

141142
// create an observable that loads a batch to be indexed
142143

143-
if(reIndexRequestBuilder.getCollectionName().isPresent()) {
144+
final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent();
145+
146+
if(isForCollection) {
144147

145148
String collectionName = InflectionUtils.pluralize(
146149
CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() ));
@@ -175,12 +178,36 @@ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBui
175178
if( edgeScopes.size() > 0 ) {
176179
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
177180
}
178-
writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
179-
.doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
181+
if( isForCollection ){
182+
writeStateMetaForCollection(
183+
appId.get().getApplication().getUuid().toString(),
184+
reIndexRequestBuilder.getCollectionName().get(),
185+
Status.INPROGRESS, count.get(),
186+
System.currentTimeMillis() );
187+
}else{
188+
writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() );
189+
}
190+
})
191+
.doOnCompleted(() ->{
192+
if( isForCollection ){
193+
writeStateMetaForCollection(
194+
appId.get().getApplication().getUuid().toString(),
195+
reIndexRequestBuilder.getCollectionName().get(),
196+
Status.COMPLETE, count.get(),
197+
System.currentTimeMillis() );
198+
}else {
199+
writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis());
200+
}
201+
})
180202
.subscribeOn( Schedulers.io() ).subscribe();
181203

204+
if(isForCollection){
205+
return new ReIndexStatus( "", Status.STARTED, 0, 0, reIndexRequestBuilder.getCollectionName().get() );
206+
207+
}
208+
182209

183-
return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
210+
return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" );
184211
}
185212

186213

@@ -196,38 +223,15 @@ public ReIndexStatus getStatus( final String jobId ) {
196223
return getIndexResponse( jobId );
197224
}
198225

199-
200-
/**
201-
* Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete
202-
*/
203-
private class FlushingCollector {
204-
205-
private final String jobId;
206-
private long count;
207-
208-
209-
private FlushingCollector( final String jobId ) {
210-
this.jobId = jobId;
211-
}
212-
213-
214-
public void flushBuffer( final List<EdgeScope> buffer ) {
215-
count += buffer.size();
216-
217-
//write our cursor state
218-
if ( buffer.size() > 0 ) {
219-
writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
220-
}
221-
222-
writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
223-
}
224-
225-
public void complete(){
226-
writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
227-
}
226+
@Override
227+
public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) {
228+
Preconditions.checkNotNull( collectionName, "appIdString must not be null" );
229+
Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
230+
return getIndexResponseForCollection( appIdString, collectionName );
228231
}
229232

230233

234+
231235
/**
232236
* Get the resume edge scope
233237
*
@@ -346,15 +350,47 @@ private ReIndexStatus getIndexResponse( final String jobId ) {
346350
final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
347351

348352
if(stringStatus == null){
349-
return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 );
353+
return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" );
350354
}
351355

352356
final Status status = Status.valueOf( stringStatus );
353357

354358
final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
355359
final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
356360

357-
return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
361+
return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" );
362+
}
363+
364+
365+
private void writeStateMetaForCollection(final String appIdString, final String collectionName,
366+
final Status status, final long processedCount, final long lastUpdated ) {
367+
368+
if(logger.isDebugEnabled()) {
369+
logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}",
370+
collectionName, status, processedCount, lastUpdated);
371+
}
372+
373+
mapManager.putString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY, status.name() );
374+
mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY, processedCount );
375+
mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY, lastUpdated );
376+
}
377+
378+
379+
private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) {
380+
381+
final String stringStatus =
382+
mapManager.getString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY );
383+
384+
if(stringStatus == null){
385+
return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName );
386+
}
387+
388+
final Status status = Status.valueOf( stringStatus );
389+
390+
final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY );
391+
final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY );
392+
393+
return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName );
358394
}
359395
}
360396

stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -251,26 +251,41 @@ public ApiResponse clearCollectionJobGet(
251251
}
252252

253253

254-
// TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
255-
// So system access only.
256-
// TODO: use scheduler here to get around people sending a reindex call 30 times.
254+
257255
@POST
258256
@Path("{itemName}/_reindex")
259257
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
260-
@RequireSystemAccess
258+
@RequireApplicationAccess
261259
@JSONP
262260
public ApiResponse executePostForReindexing(
263-
@Context UriInfo ui, String body,
261+
@Context UriInfo ui, final Map<String, Object> payload,
264262
@PathParam("itemName") PathSegment itemName,
265263
@QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
266264

267265
addItemToServiceContext( ui, itemName );
268266

269267
IndexResource indexResource = new IndexResource(injector);
270-
return indexResource.rebuildIndexesPost(
268+
return indexResource.rebuildIndexCollectionPost(payload,
271269
services.getApplicationId().toString(),itemName.getPath(),false,callback );
272270
}
273271

272+
@GET
273+
@Path("{itemName}/_reindex")
274+
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
275+
@RequireApplicationAccess
276+
@JSONP
277+
public ApiResponse executeGetForReindexStatus(
278+
@Context UriInfo ui, final Map<String, Object> payload,
279+
@PathParam("itemName") PathSegment itemName,
280+
@QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
281+
282+
addItemToServiceContext( ui, itemName );
283+
284+
IndexResource indexResource = new IndexResource(injector);
285+
return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(),
286+
callback );
287+
}
288+
274289

275290
private CollectionDeleteService getCollectionDeleteService() {
276291
return injector.getInstance( CollectionDeleteService.class );
@@ -310,18 +325,17 @@ private ApiResponse executeResumeAndCreateResponse( final Map<String, Object> pa
310325
private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {
311326

312327

313-
final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
328+
final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection(request);
314329

315330
final ApiResponse response = createApiResponse();
316331

317-
response.setAction( "clear collection" );
318-
response.setProperty( "jobId", status.getJobId() );
319-
response.setProperty( "status", status.getStatus() );
320-
response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
321-
response.setProperty( "numberQueued", status.getNumberProcessed() );
332+
response.setAction("clear collection");
333+
response.setProperty("jobId", status.getJobId());
334+
response.setProperty("status", status.getStatus());
335+
response.setProperty("lastUpdatedEpoch", status.getLastUpdated());
336+
response.setProperty("numberQueued", status.getNumberProcessed());
322337
response.setSuccess();
323338

324339
return response;
325340
}
326-
327341
}

0 commit comments

Comments
 (0)