Our expectation is that customers will task the operator with managing hundreds of WebLogic domains across dozens of Kubernetes Namespaces. Therefore, we have designed the operator with an efficient user-level threads pattern. We’ve used that pattern to implement an asynchronous call model for Kubernetes API requests. This call model has built-in support for timeouts, retries with exponential back-off, and lists that exceed the requested maximum size using the continuance functionality.
The user-level thread pattern is implemented by the classes in the oracle.kubernetes.operator.work
package.
Engine
: The executor service and factory for Fibers
.Fiber
: The user-level thread. Fibers
represent the execution of a single processing flow through a series of Steps
. Fibers
may be suspended and later resumed, and do not consume a Thread
while suspended.Step
: Individual CPU-bound activity in a processing flow.Packet
: Context of the processing flow.NextAction
: Used by a Step
when it returns control to the Fiber
to indicate what should happen next. Common ’next actions’ are to execute another Step
or to suspend the Fiber
.Component
: Provider of SPI’s that may be useful to the processing flow.Container
: Represents the containing environment and is a Component
.Each Step
has a reference to the next Step
in the processing flow; however, Steps
are not required to indicate that the next Step
be invoked by the Fiber
when the Step
returns a NextAction
to the Fiber
. This leads to common use cases where Fibers
invoke a series of Steps
that are linked by the ‘is-next’ relationship, but just as commonly, use cases where the Fiber
will invoke sets of Steps
along a detour before returning to the normal flow.
In this sample, the caller creates an Engine
, Fiber
, linked set of Step
instances, and Packet
. The Fiber
is then started. The Engine
would typically be a singleton, since it’s backed by a ScheduledExecutorService
. The Packet
would also typically be pre-loaded with values that the Steps
would use in their apply()
methods.
static class SomeClass {
public static void main(String[] args) {
Engine engine = new Engine("worker-pool");
Fiber fiber = engine.createFiber();
Step step = new StepOne(new StepTwo(new StepThree(null)));
Packet packet = new Packet();
fiber.start(
step,
packet,
new CompletionCallback() {
@Override
public void onCompletion(Packet packet) {
// Fiber has completed successfully
}
@Override
public void onThrowable(Packet packet, Throwable throwable) {
// Fiber processing was terminated with an exception
}
});
}
}
Steps
must not invoke sleep or blocking calls from within apply()
. This prevents the worker threads from serving other Fibers
. Instead, use asynchronous calls and the Fiber
suspend/resume pattern. Step
provides a method, doDelay()
, which creates a NextAction
to drive Fiber
suspend/resume that is a better option than sleep precisely because the worker thread can serve other Fibers
during the delay. For asynchronous IO or similar patterns, suspend the Fiber
. In the callback as the Fiber
suspends, initiate the asynchronous call. Finally, when the call completes, resume the Fiber
. The suspend/resume functionality handles the case where resumed before the suspending callback completes.
In this sample, the step uses asynchronous file IO and the suspend/resume Fiber
pattern.
static class StepTwo extends Step {
public StepTwo(Step next) {
super(next);
}
@Override
public NextAction apply(Packet packet) {
return doSuspend((fiber) -> {
// The Fiber is now suspended
// Start the asynchronous call
try {
Path path = Paths.get(URI.create(this.getClass().getResource("/somefile.dat").toString()));
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
void completed(Integer result, ByteBuffer attachment) {
// Store data in Packet and resume Fiber
packet.put("DATA_SIZE_READ", result);
packet.put("DATA_FROM_SOMEFILE", attachment);
fiber.resume(packet);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
// log exc
completed(0, null);
}
});
} catch (IOException e) {
// log exception
// If not resumed here, Fiber will never be resumed
}
});
}
}
The asynchronous call model is implemented by classes in the oracle.kubernetes.operator.helpers
package, including CallBuilder
and ResponseStep
. The model is based on the Fiber
suspend/resume pattern described previously. CallBuilder
provides many methods having names ending with “Async”, such as listPodAsync()
or deleteServiceAsync()
. These methods return a Step
that can be returned as part of a NextAction
. When creating these Steps
, the developer must provide a ResponseStep
. Only ResponseStep.onSuccess()
must be implemented; however, it is often useful to override onFailure()
as Kubernetes treats 404 (Not Found)
as a failure.
In this sample, the developer is using the pattern to list pods from the default namespace that are labeled as part of cluster-1
.
static class StepOne extends Step {
public StepOne(Step next) {
super(next);
}
@Override
public NextAction apply(Packet packet) {
String namespace = "default";
Step step = CallBuilder.create().with($ -> {
$.labelSelector = "weblogic.clusterName=cluster-1";
$.limit = 50;
$.timeoutSeconds = 30;
}).listPodAsync(namespace, new ResponseStep<V1PodList>(next) {
@Override
public NextAction onFailure(Packet packet, ApiException e, int statusCode,
Map<String, List<String>> responseHeaders) {
if (statusCode == CallBuilder.NOT_FOUND) {
return onSuccess(packet, null, statusCode, responseHeaders);
}
return super.onFailure(packet, e, statusCode, responseHeaders);
}
@Override
NextAction onSuccess(Packet packet, V1PodList result, int statusCode,
Map<String, List<String>> responseHeaders) {
// do something with the result Pod, if not null
return doNext(packet);
}
});
return doNext(step, packet);
}
}
Notice that the required parameters, such as namespace
, are method arguments, but optional parameters are designated using a simplified builder pattern using with()
and a lambda.
The default behavior of onFailure()
will retry with an exponential backoff the request on status codes 429 (TooManyRequests)
, 500 (InternalServerError)
, 503 (ServiceUnavailable)
, 504 (ServerTimeout)
or a simple timeout with no response from the server.
If the server responds with status code 409 (Conflict)
, then this indicates an optimistic locking failure. Common use cases are that the code read a Kubernetes object in one asynchronous step, modified the object, and attempted to replace the object in another asynchronous step; however, another activity replaced that same object in the interim. In this case, retrying the request would give the same result. Therefore, developers may provide an “on conflict” step when calling super.onFailure()
. The conflict step will be invoked after an exponential backoff delay. In this example, that conflict step should be the step that reads the existing Kubernetes object.