Skip to content

Flow Tutorial: How Things Do Not Work

Jingyu Zhou edited this page Nov 12, 2021 · 1 revision

General concept

  • Single threaded
  • Calling ACTOR function creates an object and does NOT block
    • Hold the returned Future object, otherwise the actor is cancelled. Good pattern: Add a future variable to an actor collection and wait on it, otherwise if an exception is thrown from the actor, the exception is silently ignored (not raised to the caller function!). E.g., masterServer().
ACTOR Future<Void> promiseDemo() {
	state Promise<int> promise;
	Future<Void> f = someFuture(promise.getFuture());
	wait(delay(3.0));
	promise.send(2);
	//wait(f);
	return Void();
}
  • ACTOR chaining and exceptions
  • wait() on Future to receive the exception
ACTOR Future<Void> someFuture(Future<int> ready) {
	loop choose {
		when(wait(delay(0.5))) { std::cout << "Still waiting...\n"; }
		when(int r = wait(ready)) {
			std::cout << format("Ready %d\n", r);
			throw io_error(); // compiled away
		}
	}
}

ACTOR Future<Void> promiseDemo() {
	try {
		state Promise<int> promise;
		state Future<Void> f = someFuture(promise.getFuture());
		wait(delay(3.0));
		promise.send(2);
		wait(delay(1.0));
		std::cout << "f isReady: " << f.isReady() << ", isError: " << f.isError() << "\n";
	} catch (Error& e) {
		std::cout << "Caught error: " << e.name() << "\n";
	}
	return Void();
}
  • All parameters are const &
    • Shadowing argument, e.g., std::vector<Future<Void>>, iterator
ACTOR Future<Void> fun(std::vector<Future<Void>> actors, Future<int> ready) {
	wait(delay(1.0));
	std::cout << "fun runs...\n";
	actors.push_back(someFuture(ready));
	std::cout << "fun returns...\n";
	return Void();
}

ACTOR Future<Void> promiseDemo() {
	state Promise<int> promise;
	state std::vector<Future<Void>> actors;
	state Future<Void> f = fun(actors, promise.getFuture());
	wait(delay(3.0));
	promise.send(2);
	wait(f);
	return Void();
}

// segfault:
ACTOR Future<Void> fun(std::vector<Future<Void>> actors, Future<int> ready) {
	state std::vector<Future<Void>>::const_iterator end = actors.end();

	wait(delay(1.0));
	std::cout << "fun runs...\n";
	actors.push_back(someFuture(ready));

	for (auto it = actors.begin(); it != end; it++) {
		std::cout << it->isReady() << "\n";
	}
	std::cout << "fun returns...\n";
	return Void();
}
  • Reduce state variables
  • state X x(params) results in no matching function for call to 'X::X()' error.
  • broken promise. RPC server side, the ReplyPromise is destroyed before replying back. E.g.,
ACTOR Future<Void> echoServer() {
	state EchoServerInterface echoServer;
	echoServer.getInterface.makeWellKnownEndpoint(UID(-1, ++tokenCounter), TaskPriority::DefaultEndpoint);
	loop {
			when(EchoRequest req = waitNext(echoServer.echo.getFuture())) { break; }
		}
	}

	loop {
		choose {
			when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) {
				req.reply.send(echoServer);
			}
			when(EchoRequest req = waitNext(echoServer.echo.getFuture())) { req.reply.send(req.message); }
			when(ReverseRequest req = waitNext(echoServer.reverse.getFuture())) {
				req.reply.send(std::string(req.message.rbegin(), req.message.rend()));
			}
		}
	}
}

// Related problem: Assertion !getReplyPromise(value).getFuture().isReady() failed 
	state EchoRequest echoRequest;

	state int i = 0;
	for ( ; i < 3; i++) {
		echoRequest.message = "Hello World";
		echoRequest.reply.reset();
  loop {
    try {
      choose {
        when ( GetRecoveryInfoReply infoReply = wait( brokenPromiseToNever(self->dbInfo->get().master.getRecoveryInfo.getReply(GetRecoveryInfoRequest(di.id)) )) ) {
          configuration->set( infoReply.configuration );
          recoveryTransactionVersion = infoReply.recoveryTransactionVersion;
          TraceEvent("DataDistributor", di.id)
            .detail("RecoveryVersion", infoReply.recoveryTransactionVersion)
            .detail("Configuration", configuration->get().toString());
          break;
        }
        when ( wait(self->dbInfo->onChange()) ) {}
      }
    }
    catch (Error& e) {
      if ( e.code() != error_code_broken_promise ) {
        TraceEvent("DataDistributorError", di.id).error(e);
        throw;
      }
    }
  }

In the above code, brokenPromiseToNever() is important. Without it, a broken promise raises an exception that is caught. But the next run, we can get a bad self->dbInfo. In contrast, the brokenPromiseToNever() will block the broken promise. As a result, we will wait until dbInfo->onChange() is completed, getting the new dbInfo and starts the loop again. This time, the self->dbInfo is correct.

  • Very rare: uncancellable ACTOR: grep ACTOR */*.actor.cpp | grep void.
  • static ACTOR member function
  • Flow priorities
  • Flow locks

Generics

Resources