Details
Description
vinodkone discovered that this can happen if the scheduler calls SchedulerDriver#stop before or while handling Scheduler#statusUpdate.
In src/sched/sched.cpp:
The driver invokes statusUpdate and later checks the aborted flag to determine whether to send an ACK.
void statusUpdate( const UPID& from, const StatusUpdate& update, const UPID& pid) { ... scheduler->statusUpdate(driver, status); VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed(); // Note that we need to look at the volatile 'aborted' here to // so that we don't acknowledge the update if the driver was // aborted during the processing of the update. if (aborted) { VLOG(1) << "Not sending status update acknowledgment message because " << "the driver is aborted!"; return; } ...
In src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp:
The statusUpdate implementation checks for an exception and invokes driver->abort().
void JNIScheduler::statusUpdate(SchedulerDriver* driver, const TaskStatus& status) { jvm->AttachCurrentThread(JNIENV_CAST(&env), NULL); jclass clazz = env->GetObjectClass(jdriver); jfieldID scheduler = env->GetFieldID(clazz, "scheduler", "Lorg/apache/mesos/Scheduler;"); jobject jscheduler = env->GetObjectField(jdriver, scheduler); clazz = env->GetObjectClass(jscheduler); // scheduler.statusUpdate(driver, status); jmethodID statusUpdate = env->GetMethodID(clazz, "statusUpdate", "(Lorg/apache/mesos/SchedulerDriver;" "Lorg/apache/mesos/Protos$TaskStatus;)V"); jobject jstatus = convert<TaskStatus>(env, status); env->ExceptionClear(); env->CallVoidMethod(jscheduler, statusUpdate, jdriver, jstatus); if (env->ExceptionCheck()) { env->ExceptionDescribe(); env->ExceptionClear(); jvm->DetachCurrentThread(); driver->abort(); return; } jvm->DetachCurrentThread(); }
In src/sched/sched.cpp:
The abort() implementation exits early if status != DRIVER_RUNNING, and does not set the aborted flag.
Status MesosSchedulerDriver::abort() { Lock lock(&mutex); if (status != DRIVER_RUNNING) { return status; } CHECK(process != NULL); // We set the volatile aborted to true here to prevent any further // messages from being processed in the SchedulerProcess. However, // if abort() is called from another thread as the SchedulerProcess, // there may be at most one additional message processed. // TODO(bmahler): Use an atomic boolean. process->aborted = true; // Dispatching here ensures that we still process the outstanding // requests *from* the scheduler, since those do proceed when // aborted is true. dispatch(process, &SchedulerProcess::abort); return status = DRIVER_ABORTED; }
As a result, the code will ACK despite an exception being thrown.