Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@
}
BulkResponse result;
try {
result = client.bulk(br.refresh(Refresh.True).build());
result = client.bulk(br.refresh(Refresh.WaitFor).build());
// Log errors, if any
if (result.errors()) {
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
Expand Down Expand Up @@ -427,17 +427,29 @@
@Override
public List<Channel> findAllById(Iterable<String> channelIds) {
try {
List<String> ids =
StreamSupport.stream(channelIds.spliterator(), false).collect(Collectors.toList());
List<String> ids = normalizeIds(channelIds);

SearchRequest.Builder searchBuilder =
new SearchRequest.Builder()
.index(esService.getES_CHANNEL_INDEX())
.query(IdsQuery.of(q -> q.values(ids))._toQuery())
.size(esService.getES_QUERY_SIZE())
.sort(SortOptions.of(s -> s.field(FieldSort.of(f -> f.field("name")))));
SearchResponse<Channel> response = client.search(searchBuilder.build(), Channel.class);
return response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
if (ids.isEmpty()) {
return Collections.emptyList();
}

int lookupBatchSize = Math.clamp(chunkSize, 1, esService.getES_QUERY_SIZE());
List<Channel> result = new ArrayList<>();

for (int i = 0; i < ids.size(); i += lookupBatchSize) {
List<String> chunk = ids.subList(i, Math.min(i + lookupBatchSize, ids.size()));
SearchRequest.Builder searchBuilder =
new SearchRequest.Builder()
.index(esService.getES_CHANNEL_INDEX())
.query(IdsQuery.of(q -> q.values(chunk))._toQuery())
.size(chunk.size())
.sort(SortOptions.of(s -> s.field(FieldSort.of(f -> f.field("name")))));
SearchResponse<Channel> response = client.search(searchBuilder.build(), Channel.class);
result.addAll(
response.hits().hits().stream().map(Hit::source).collect(Collectors.toList()));
}

return result;
} catch (ElasticsearchException | IOException e) {
logger.log(Level.SEVERE, TextUtil.FAILED_TO_FIND_ALL_CHANNELS, e);
throw new ResponseStatusException(
Expand Down Expand Up @@ -805,9 +817,70 @@
}

@Override
@SuppressWarnings("unchecked")
public void deleteAllById(Iterable<? extends String> ids) {
// TODO Auto-generated method stub
deleteAllByIdBestEffort((Iterable<String>) ids);
}

public long deleteAllByIdBestEffort(Iterable<String> ids) {
List<String> idList = normalizeIds(ids);

if (idList.isEmpty()) {
return 0;
}

long deletedCount = 0;

for (int i = 0; i < idList.size(); i += chunkSize) {
List<String> chunk = idList.subList(i, Math.min(i + chunkSize, idList.size()));
BulkRequest.Builder br = new BulkRequest.Builder();
for (String id : chunk) {
br.operations(op -> op.delete(del -> del.index(esService.getES_CHANNEL_INDEX()).id(id)));
}
br.refresh(Refresh.True);

try {
BulkResponse result = client.bulk(br.build());
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
logger.log(
Level.SEVERE,
() ->
MessageFormat.format(
"Failed to delete channel id {0}: {1}", item.id(), item.error().reason()));
continue;
}
if (Integer.valueOf(200).equals(item.status())) {
deletedCount++;
Comment thread
shroffk marked this conversation as resolved.
}
}
} catch (IOException e) {
logger.log(
Level.SEVERE,
MessageFormat.format(
"Bulk delete failed for chunk starting at index {0} with size {1}",
i, chunk.size()),
e);
}
}

return deletedCount;
}

/**
* Normalizes channel IDs by dropping null/blank values and removing duplicates while preserving
* encounter order.
*
* @param ids raw channel IDs from request/repository callers
* @return distinct, non-blank channel IDs in encounter order
*/
private static List<String> normalizeIds(Iterable<String> ids) {
// TODO: Consider rejecting blank/whitespace-only IDs with 400 at the API boundary.

Check warning on line 878 in src/main/java/org/phoebus/channelfinder/repository/ChannelRepository.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=ChannelFinder_ChannelFinderService&issues=AZ4Xugtez9F_u5ZTvjvK&open=AZ4Xugtez9F_u5ZTvjvK&pullRequest=217
return StreamSupport.stream(ids.spliterator(), false)
.filter(id -> id != null && !id.isBlank())
.collect(Collectors.toCollection(LinkedHashSet::new))
.stream()
.toList();
}

@PreDestroy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ChannelService {

private static final Logger audit = Logger.getLogger(ChannelService.class.getName() + ".audit");
private static final Logger logger = Logger.getLogger(ChannelService.class.getName());
private static final String BATCH_OPERATION_SUBJECT = "channels batch";

private final ChannelRepository channelRepository;
private final TagRepository tagRepository;
Expand Down Expand Up @@ -92,7 +93,7 @@ public Channel create(String channelName, Channel channel) {
}

public Iterable<Channel> create(Iterable<Channel> channels) {
requireRole(ROLES.CF_CHANNEL, "channels batch");
requireRole(ROLES.CF_CHANNEL, BATCH_OPERATION_SUBJECT);

List<Channel> channelList = Lists.newArrayList(channels);
Map<String, Channel> existing = findExistingChannels(channelList);
Expand Down Expand Up @@ -145,7 +146,7 @@ public Channel update(String channelName, Channel channel) {
}

public Iterable<Channel> update(Iterable<Channel> channels) {
requireRole(ROLES.CF_CHANNEL, "channels batch");
requireRole(ROLES.CF_CHANNEL, BATCH_OPERATION_SUBJECT);

List<Channel> channelList = Lists.newArrayList(channels);
Map<String, Channel> existing = findExistingChannels(channelList);
Expand Down Expand Up @@ -179,6 +180,24 @@ public void remove(String channelName) {
channelRepository.deleteById(channelName);
}

public long remove(Iterable<String> channelNames) {
requireRole(ROLES.CF_CHANNEL, BATCH_OPERATION_SUBJECT);
List<Channel> existingChannels = channelRepository.findAllById(channelNames);

for (Channel existing : existingChannels) {
requireOwner(existing);
audit.log(
Level.INFO, () -> MessageFormat.format(TextUtil.DELETE_CHANNEL, existing.getName()));
}

if (existingChannels.isEmpty()) {
return 0;
}

return channelRepository.deleteAllByIdBestEffort(
existingChannels.stream().map(Channel::getName).toList());
}

private Map<String, Channel> findExistingChannels(List<Channel> channels) {
return channelRepository.findAllById(channels.stream().map(Channel::getName).toList()).stream()
.collect(Collectors.toMap(Channel::getName, c -> c));
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/org/phoebus/channelfinder/web/v0/api/IChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,24 @@ long queryCount(
})
@DeleteMapping("/{channelName}")
void remove(@PathVariable("channelName") String channelName);

@Operation(
summary = "Delete multiple channels",
description = "Delete multiple channel instances identified by a request-body list of names.",
operationId = "deleteChannels",
tags = {"Channel"})
@ApiResponses(
value = {
@ApiResponse(responseCode = "200", description = "Number of channels deleted"),
@ApiResponse(
responseCode = "401",
description = "Unauthorized",
content = @Content(schema = @Schema(implementation = ResponseStatusException.class))),
@ApiResponse(
responseCode = "500",
description = "Error while trying to delete channels",
content = @Content(schema = @Schema(implementation = ResponseStatusException.class)))
Comment thread
jacomago marked this conversation as resolved.
})
@DeleteMapping
long remove(@RequestBody List<String> channelNames);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,9 @@ public Iterable<Channel> update(Iterable<Channel> channels) {
public void remove(String channelName) {
channelService.remove(channelName);
}

@Override
public long remove(List<String> channelNames) {
return channelService.remove(channelNames);
}
}
Loading
Loading