Skip to content

Commit ceba806

Browse files
committed
feat: bulk dependent resources (#1448)
1 parent 3dccc2b commit ceba806

29 files changed

+956
-67
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceDiscriminator.java

-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@
77
public interface ResourceDiscriminator<R, P extends HasMetadata> {
88

99
Optional<R> distinguish(Class<R> resource, P primary, Context<P> context);
10-
1110
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/ReconcileResult.java

+37-13
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package io.javaoperatorsdk.operator.api.reconciler.dependent;
22

3-
import java.util.Optional;
3+
import java.util.*;
4+
import java.util.stream.Collectors;
45

56
import io.fabric8.kubernetes.api.model.HasMetadata;
67
import io.javaoperatorsdk.operator.processing.event.ResourceID;
78

89
public class ReconcileResult<R> {
910

10-
private final R resource;
11-
private final Operation operation;
11+
private final Map<R, Operation> resourceOperations;
1212

1313
public static <T> ReconcileResult<T> resourceCreated(T resource) {
1414
return new ReconcileResult<>(resource, Operation.CREATED);
@@ -22,25 +22,49 @@ public static <T> ReconcileResult<T> noOperation(T resource) {
2222
return new ReconcileResult<>(resource, Operation.NONE);
2323
}
2424

25+
@SafeVarargs
26+
public static <T> ReconcileResult<T> aggregatedResult(ReconcileResult<T>... results) {
27+
if (results == null) {
28+
throw new IllegalArgumentException("Should provide results to aggregate");
29+
}
30+
if (results.length == 1) {
31+
return results[0];
32+
}
33+
final Map<T, Operation> operations = new HashMap<>(results.length);
34+
for (ReconcileResult<T> res : results) {
35+
res.getSingleResource().ifPresent(r -> operations.put(r, res.getSingleOperation()));
36+
}
37+
return new ReconcileResult<>(operations);
38+
}
39+
2540
@Override
2641
public String toString() {
27-
return getResource()
28-
.map(r -> r instanceof HasMetadata ? ResourceID.fromResource((HasMetadata) r) : r)
29-
.orElse("no resource")
30-
+ " -> " + operation;
42+
return resourceOperations.entrySet().stream().collect(Collectors.toMap(
43+
e -> e instanceof HasMetadata ? ResourceID.fromResource((HasMetadata) e) : e,
44+
Map.Entry::getValue))
45+
.toString();
3146
}
3247

3348
private ReconcileResult(R resource, Operation operation) {
34-
this.resource = resource;
35-
this.operation = operation;
49+
resourceOperations = resource != null ? Map.of(resource, operation) : Collections.emptyMap();
50+
}
51+
52+
private ReconcileResult(Map<R, Operation> operations) {
53+
resourceOperations = Collections.unmodifiableMap(operations);
54+
}
55+
56+
public Optional<R> getSingleResource() {
57+
return resourceOperations.entrySet().stream().findFirst().map(Map.Entry::getKey);
3658
}
3759

38-
public Optional<R> getResource() {
39-
return Optional.ofNullable(resource);
60+
public Operation getSingleOperation() {
61+
return resourceOperations.entrySet().stream().findFirst().map(Map.Entry::getValue)
62+
.orElseThrow();
4063
}
4164

42-
public Operation getOperation() {
43-
return operation;
65+
@SuppressWarnings("unused")
66+
public Map<R, Operation> getResourceOperations() {
67+
return resourceOperations;
4468
}
4569

4670
public enum Operation {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractDependentResource.java

+93-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.dependent;
22

3+
import java.util.ArrayList;
4+
import java.util.List;
35
import java.util.Optional;
46

57
import org.slf4j.Logger;
@@ -20,25 +22,73 @@ public abstract class AbstractDependentResource<R, P extends HasMetadata>
2022

2123
protected final boolean creatable = this instanceof Creator;
2224
protected final boolean updatable = this instanceof Updater;
25+
protected final boolean bulk = this instanceof BulkDependentResource;
2326

2427
protected Creator<R, P> creator;
2528
protected Updater<R, P> updater;
29+
protected BulkDependentResource<R, P> bulkDependentResource;
2630

27-
private ResourceDiscriminator<R, P> resourceDiscriminator;
31+
private final List<ResourceDiscriminator<R, P>> resourceDiscriminator = new ArrayList<>(1);
2832

2933
@SuppressWarnings("unchecked")
3034
public AbstractDependentResource() {
3135
creator = creatable ? (Creator<R, P>) this : null;
3236
updater = updatable ? (Updater<R, P>) this : null;
37+
38+
bulkDependentResource = bulk ? (BulkDependentResource<R, P>) this : null;
3339
}
3440

3541
@Override
3642
public ReconcileResult<R> reconcile(P primary, Context<P> context) {
37-
Optional<R> maybeActual = getSecondaryResource(primary, context);
43+
if (bulk) {
44+
final var count = bulkDependentResource.count(primary, context);
45+
deleteBulkResourcesIfRequired(count, lastKnownBulkSize(), primary, context);
46+
adjustDiscriminators(count);
47+
@SuppressWarnings("unchecked")
48+
final ReconcileResult<R>[] results = new ReconcileResult[count];
49+
for (int i = 0; i < count; i++) {
50+
results[i] = reconcileIndexAware(primary, i, context);
51+
}
52+
return ReconcileResult.aggregatedResult(results);
53+
} else {
54+
return reconcileIndexAware(primary, 0, context);
55+
}
56+
}
57+
58+
protected void deleteBulkResourcesIfRequired(int targetCount, int actualCount, P primary,
59+
Context<P> context) {
60+
if (targetCount >= actualCount) {
61+
return;
62+
}
63+
for (int i = targetCount; i < actualCount; i++) {
64+
var resource = getSecondaryResourceIndexAware(primary, i, context);
65+
var index = i;
66+
resource.ifPresent(
67+
r -> bulkDependentResource.deleteBulkResourceWithIndex(primary, r, index, context));
68+
}
69+
}
70+
71+
private void adjustDiscriminators(int count) {
72+
if (resourceDiscriminator.size() == count) {
73+
return;
74+
}
75+
if (resourceDiscriminator.size() < count) {
76+
for (int i = resourceDiscriminator.size(); i < count; i++) {
77+
resourceDiscriminator.add(bulkDependentResource.getResourceDiscriminator(i));
78+
}
79+
}
80+
if (resourceDiscriminator.size() > count) {
81+
resourceDiscriminator.subList(count, resourceDiscriminator.size()).clear();
82+
}
83+
}
84+
85+
protected ReconcileResult<R> reconcileIndexAware(P primary, int i, Context<P> context) {
86+
Optional<R> maybeActual = bulk ? getSecondaryResourceIndexAware(primary, i, context)
87+
: getSecondaryResource(primary, context);
3888
if (creatable || updatable) {
3989
if (maybeActual.isEmpty()) {
4090
if (creatable) {
41-
var desired = desired(primary, context);
91+
var desired = desiredIndexAware(primary, i, context);
4292
throwIfNull(desired, primary, "Desired");
4393
logForOperation("Creating", primary, desired);
4494
var createdResource = handleCreate(desired, primary, context);
@@ -47,9 +97,15 @@ public ReconcileResult<R> reconcile(P primary, Context<P> context) {
4797
} else {
4898
final var actual = maybeActual.get();
4999
if (updatable) {
50-
final var match = updater.match(actual, primary, context);
100+
final Matcher.Result<R> match;
101+
if (bulk) {
102+
match = updater.match(actual, primary, i, context);
103+
} else {
104+
match = updater.match(actual, primary, context);
105+
}
51106
if (!match.matched()) {
52-
final var desired = match.computedDesired().orElseGet(() -> desired(primary, context));
107+
final var desired =
108+
match.computedDesired().orElse(desiredIndexAware(primary, i, context));
53109
throwIfNull(desired, primary, "Desired");
54110
logForOperation("Updating", primary, desired);
55111
var updatedResource = handleUpdate(actual, desired, primary, context);
@@ -67,9 +123,18 @@ public ReconcileResult<R> reconcile(P primary, Context<P> context) {
67123
return ReconcileResult.noOperation(maybeActual.orElse(null));
68124
}
69125

126+
private R desiredIndexAware(P primary, int i, Context<P> context) {
127+
return bulk ? desired(primary, i, context)
128+
: desired(primary, context);
129+
}
130+
70131
public Optional<R> getSecondaryResource(P primary, Context<P> context) {
71-
return resourceDiscriminator == null ? context.getSecondaryResource(resourceType())
72-
: resourceDiscriminator.distinguish(resourceType(), primary, context);
132+
return resourceDiscriminator.isEmpty() ? context.getSecondaryResource(resourceType())
133+
: resourceDiscriminator.get(0).distinguish(resourceType(), primary, context);
134+
}
135+
136+
protected Optional<R> getSecondaryResourceIndexAware(P primary, int index, Context<P> context) {
137+
return context.getSecondaryResource(resourceType(), resourceDiscriminator.get(index));
73138
}
74139

75140
private void throwIfNull(R desired, P primary, String descriptor) {
@@ -97,7 +162,7 @@ protected R handleCreate(R desired, P primary, Context<P> context) {
97162
}
98163

99164
/**
100-
* Allows sub-classes to perform additional processing (e.g. caching) on the created resource if
165+
* Allows subclasses to perform additional processing (e.g. caching) on the created resource if
101166
* needed.
102167
*
103168
* @param primaryResourceId the {@link ResourceID} of the primary resource associated with the
@@ -129,12 +194,29 @@ protected R desired(P primary, Context<P> context) {
129194
"desired method must be implemented if this DependentResource can be created and/or updated");
130195
}
131196

132-
public void setResourceDiscriminator(
197+
protected R desired(P primary, int index, Context<P> context) {
198+
throw new IllegalStateException(
199+
"Must be implemented for bulk DependentResource creation");
200+
}
201+
202+
public AbstractDependentResource<R, P> setResourceDiscriminator(
133203
ResourceDiscriminator<R, P> resourceDiscriminator) {
134-
this.resourceDiscriminator = resourceDiscriminator;
204+
if (resourceDiscriminator != null) {
205+
this.resourceDiscriminator.add(resourceDiscriminator);
206+
}
207+
return this;
135208
}
136209

137210
public ResourceDiscriminator<R, P> getResourceDiscriminator() {
138-
return resourceDiscriminator;
211+
if (this.resourceDiscriminator.isEmpty()) {
212+
return null;
213+
} else {
214+
return this.resourceDiscriminator.get(0);
215+
}
139216
}
217+
218+
protected int lastKnownBulkSize() {
219+
return resourceDiscriminator.size();
220+
}
221+
140222
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.javaoperatorsdk.operator.processing.dependent;
2+
3+
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
import io.javaoperatorsdk.operator.api.reconciler.Context;
5+
import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
6+
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
7+
8+
/**
9+
* Manages dynamic number of resources created for a primary resource. Since the point of a bulk
10+
* dependent resource is to manage the number of secondary resources dynamically it implement
11+
* {@link Creator} and {@link Deleter} interfaces out of the box. A concrete dependent resource can
12+
* implement additionally also {@link Updater}.
13+
*/
14+
public interface BulkDependentResource<R, P extends HasMetadata> extends Creator<R, P>, Deleter<P> {
15+
16+
/**
17+
* @return number of resources to create
18+
*/
19+
int count(P primary, Context<P> context);
20+
21+
R desired(P primary, int index, Context<P> context);
22+
23+
/**
24+
* Used to delete resource if the desired count is lower than the actual count of a resource.
25+
*
26+
* @param primary resource
27+
* @param resource actual resource from the cache for the index
28+
* @param i index of the resource
29+
* @param context actual context
30+
*/
31+
void deleteBulkResourceWithIndex(P primary, R resource, int i, Context<P> context);
32+
33+
ResourceDiscriminator<R, P> getResourceDiscriminator(int index);
34+
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.javaoperatorsdk.operator.processing.dependent;
2+
3+
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
import io.javaoperatorsdk.operator.api.reconciler.Context;
5+
6+
/**
7+
* Helper for the Bulk Dependent Resources to make it more explicit that bulk needs to only
8+
* implement the index aware match method.
9+
*
10+
* @param <R> secondary resource type
11+
* @param <P> primary resource type
12+
*/
13+
public interface BulkUpdater<R, P extends HasMetadata> extends Updater<R, P> {
14+
15+
default Matcher.Result<R> match(R actualResource, P primary, Context<P> context) {
16+
throw new IllegalStateException();
17+
}
18+
19+
Matcher.Result<R> match(R actualResource, P primary, int index, Context<P> context);
20+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/DesiredEqualsMatcher.java

+6
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,10 @@ public Result<R> match(R actualResource, P primary, Context<P> context) {
1616
var desired = abstractDependentResource.desired(primary, context);
1717
return Result.computed(actualResource.equals(desired), desired);
1818
}
19+
20+
@Override
21+
public Result<R> match(R actualResource, P primary, int index, Context<P> context) {
22+
var desired = abstractDependentResource.desired(primary, index, context);
23+
return Result.computed(actualResource.equals(desired), desired);
24+
}
1925
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Matcher.java

+15
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,19 @@ public Optional<T> computedDesired() {
9595
* {@link Result#computed(boolean, Object)})
9696
*/
9797
Result<R> match(R actualResource, P primary, Context<P> context);
98+
99+
/**
100+
* Determines whether the specified secondary resource matches the desired state with target index
101+
* of a bulk resource as defined from the specified primary resource, given the specified
102+
* {@link Context}.
103+
*
104+
* @param actualResource the resource we want to determine whether it's matching the desired state
105+
* @param primary the primary resource from which the desired state is inferred
106+
* @param context the context in which the resource is being matched
107+
* @return a {@link Result} encapsulating whether the resource matched its desired state and this
108+
* associated state if it was computed as part of the matching process. Use the static
109+
* convenience methods ({@link Result#nonComputed(boolean)} and
110+
* {@link Result#computed(boolean, Object)})
111+
*/
112+
Result<R> match(R actualResource, P primary, int index, Context<P> context);
98113
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Updater.java

+4
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,8 @@ public interface Updater<R, P extends HasMetadata> {
88
R update(R actual, R desired, P primary, Context<P> context);
99

1010
Result<R> match(R actualResource, P primary, Context<P> context);
11+
12+
default Result<R> match(R actualResource, P primary, int index, Context<P> context) {
13+
throw new IllegalStateException("Implement this for bulk matching");
14+
}
1115
}

0 commit comments

Comments
 (0)