Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class IFDv2SynchronizerFactory {
public:
virtual std::unique_ptr<IFDv2Synchronizer> Build() = 0;

[[nodiscard]] virtual bool IsFDv1Fallback() const { return false; }

virtual ~IFDv2SynchronizerFactory() = default;
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete;
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete;
Expand Down
15 changes: 14 additions & 1 deletion libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ void FDv2DataSystem::OnInitializerResult(
if (closed_ || got_shutdown) {
return;
}
if (result.fdv1_fallback) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": FDv1 fallback engaged";
source_manager_.SwitchToFDv1Fallback();
got_basis = true;
}
}

if (got_basis) {
Expand Down Expand Up @@ -349,7 +355,14 @@ void FDv2DataSystem::OnSynchronizerResult(
active_conditions_.reset();
return;
}
if (advance) {
if (result.fdv1_fallback) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": FDv1 fallback engaged";
source_manager_.SwitchToFDv1Fallback();
active_synchronizer_.reset();
active_conditions_.reset();
advance = true;
} else if (advance) {
source_manager_.BlockCurrentSynchronizer();
active_synchronizer_.reset();
active_conditions_.reset();
Expand Down
16 changes: 13 additions & 3 deletions libs/server-sdk/src/data_systems/fdv2/source_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ SourceManager::SourceManager(
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories) {
synchronizers_.reserve(factories.size());
for (auto& factory : factories) {
synchronizers_.push_back(
SynchronizerFactoryWithState{std::move(factory), State::kAvailable,
/*is_fdv1_fallback=*/false});
bool const is_fdv1_fallback = factory->IsFDv1Fallback();
synchronizers_.push_back(SynchronizerFactoryWithState{
std::move(factory),
is_fdv1_fallback ? State::kBlocked : State::kAvailable,
is_fdv1_fallback});
}
}

Expand Down Expand Up @@ -44,6 +46,14 @@ void SourceManager::ResetSourceIndex() {
synchronizer_index_ = -1;
}

void SourceManager::SwitchToFDv1Fallback() {
for (auto& entry : synchronizers_) {
entry.state =
entry.is_fdv1_fallback ? State::kAvailable : State::kBlocked;
}
synchronizer_index_ = -1;
}

bool SourceManager::IsPrimeSynchronizer() const {
for (std::size_t i = 0; i < synchronizers_.size(); ++i) {
if (synchronizers_[i].state == State::kAvailable) {
Expand Down
16 changes: 11 additions & 5 deletions libs/server-sdk/src/data_systems/fdv2/source_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ namespace launchdarkly::server_side::data_systems {
* by recovery, which wants to fall back to the most-preferred Available
* synchronizer.
*
* Each factory also carries an is_fdv1_fallback flag, currently always
* false. TODO: populate when the FDv1 fallback directive is implemented.
* Factories whose IsFDv1Fallback() returns true start in the Blocked state.
*
* Not thread-safe. The caller is responsible for serializing all calls.
*/
Expand Down Expand Up @@ -54,6 +53,14 @@ class SourceManager {
*/
void ResetSourceIndex();

/**
* Blocks every non-FDv1 factory and unblocks the FDv1 fallback factory,
* if one was configured. Resets the iteration cursor so the next call to
* NextSynchronizer returns the FDv1 fallback. If no FDv1 fallback factory
* was configured, every factory is left blocked.
*/
void SwitchToFDv1Fallback();

/**
* Returns true if the currently tracked factory is the first Available
* factory in the list. Returns false if no factory is currently tracked.
Expand All @@ -73,9 +80,8 @@ class SourceManager {
[[nodiscard]] std::size_t SynchronizerCount() const;

/**
* Returns true if the currently tracked factory was configured as the
* FDv1 fallback synchronizer. Always false until the FDv1 fallback
* directive is implemented.
* Returns true if the currently tracked factory is the FDv1 fallback
* synchronizer.
*/
[[nodiscard]] bool IsCurrentSynchronizerFDv1Fallback() const;

Expand Down
132 changes: 132 additions & 0 deletions libs/server-sdk/tests/fdv2_data_system_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ class OneShotSynchronizerFactory : public IFDv2SynchronizerFactory {
std::unique_ptr<IFDv2Synchronizer> source_;
};

class FDv1FallbackOneShotFactory : public OneShotSynchronizerFactory {
public:
explicit FDv1FallbackOneShotFactory(
std::unique_ptr<IFDv2Synchronizer> source)
: OneShotSynchronizerFactory(std::move(source)) {}

bool IsFDv1Fallback() const override { return true; }
};

// Returns each pre-supplied source in order on successive Build() calls.
// Returns nullptr once the supply is exhausted. Used in tests that exercise
// wrap-around or recovery, where the same factory is built more than once.
Expand Down Expand Up @@ -1090,6 +1099,129 @@ TEST(FDv2DataSystemTest, SingleSynchronizerHasNoFallbackArmed) {
status_manager.Status().State());
}

// ============================================================================
// FDv1 fallback directive
// ============================================================================

TEST(FDv2DataSystemTest, SynchronizerFdv1FlagSwitchesToFdv1Adapter) {
auto logger = MakeNullLogger();
boost::asio::io_context ioc;
data_components::DataSourceStatusManager status_manager;

// FDv2 synchronizer emits a ChangeSet with the directive, then closes.
auto fdv2_sync =
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{[]() {
FDv2SourceResult r{FDv2SourceResult::ChangeSet{
data_model::ChangeSet<ChangeSetData>{
data_model::ChangeSetType::kNone,
{},
data_model::Selector{}}}};
r.fdv1_fallback = true;
return r;
}()});
auto fdv2_factory =
std::make_unique<OneShotSynchronizerFactory>(std::move(fdv2_sync));

// FDv1 adapter returns Shutdown when reached, ending orchestration.
auto fdv1_sync =
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{});
auto fdv1_factory =
std::make_unique<FDv1FallbackOneShotFactory>(std::move(fdv1_sync));
auto* fdv1_factory_ptr = fdv1_factory.get();

std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> synchronizers;
synchronizers.push_back(std::move(fdv2_factory));
synchronizers.push_back(std::move(fdv1_factory));

FDv2DataSystem ds({}, std::move(synchronizers),
/*fallback_condition_factory=*/nullptr,
/*recovery_condition_factory=*/nullptr,
ioc.get_executor(), &status_manager, logger);
ds.Initialize();
ioc.run();

EXPECT_EQ(1, fdv1_factory_ptr->build_count_);
}

TEST(FDv2DataSystemTest, SynchronizerFdv1FlagWithoutAdapterTransitionsOff) {
auto logger = MakeNullLogger();
boost::asio::io_context ioc;
data_components::DataSourceStatusManager status_manager;

auto fdv2_sync =
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{[]() {
FDv2SourceResult r{
FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{
FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse,
/*status_code=*/418, "directive",
std::chrono::system_clock::now()}}};
r.fdv1_fallback = true;
return r;
}()});
auto fdv2_factory =
std::make_unique<OneShotSynchronizerFactory>(std::move(fdv2_sync));

std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> synchronizers;
synchronizers.push_back(std::move(fdv2_factory));

FDv2DataSystem ds({}, std::move(synchronizers),
/*fallback_condition_factory=*/nullptr,
/*recovery_condition_factory=*/nullptr,
ioc.get_executor(), &status_manager, logger);
ds.Initialize();
ioc.run();

EXPECT_EQ(DataSourceStatus::DataSourceState::kOff,
status_manager.Status().State());
}

TEST(FDv2DataSystemTest, InitializerFdv1FlagSwitchesToFdv1Adapter) {
auto logger = MakeNullLogger();
boost::asio::io_context ioc;
data_components::DataSourceStatusManager status_manager;

// Initializer returns Interrupted with the directive set.
FDv2SourceResult init_result{
FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{
FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse,
/*status_code=*/418, "directive",
std::chrono::system_clock::now()}}};
init_result.fdv1_fallback = true;
auto initializer =
std::make_unique<MockInitializer>(std::move(init_result));

std::vector<std::unique_ptr<IFDv2InitializerFactory>> initializers;
initializers.push_back(
std::make_unique<OneShotInitializerFactory>(std::move(initializer)));

auto fdv2_sync =
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{});
auto fdv2_factory =
std::make_unique<OneShotSynchronizerFactory>(std::move(fdv2_sync));
auto* fdv2_factory_ptr = fdv2_factory.get();

auto fdv1_sync =
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{});
auto fdv1_factory =
std::make_unique<FDv1FallbackOneShotFactory>(std::move(fdv1_sync));
auto* fdv1_factory_ptr = fdv1_factory.get();

std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> synchronizers;
synchronizers.push_back(std::move(fdv2_factory));
synchronizers.push_back(std::move(fdv1_factory));

FDv2DataSystem ds(std::move(initializers), std::move(synchronizers),
/*fallback_condition_factory=*/nullptr,
/*recovery_condition_factory=*/nullptr,
ioc.get_executor(), &status_manager, logger);
ds.Initialize();
ioc.run();

// FDv2 synchronizer was skipped; FDv1 adapter was built and ran.
EXPECT_EQ(0, fdv2_factory_ptr->build_count_);
EXPECT_EQ(1, fdv1_factory_ptr->build_count_);
}

// ============================================================================
// Destruction protocol: in-flight orchestration
// ============================================================================
Expand Down
72 changes: 71 additions & 1 deletion libs/server-sdk/tests/source_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class CountingFactory : public IFDv2SynchronizerFactory {
int build_count = 0;
};

class FDv1FallbackFactory : public CountingFactory {
public:
bool IsFDv1Fallback() const override { return true; }
};

} // namespace

TEST(SourceManagerTest, EmptyManagerReportsZeroAvailable) {
Expand Down Expand Up @@ -176,7 +181,7 @@ TEST(SourceManagerTest, ResetSourceIndexSkipsBlockedFirstFactory) {
EXPECT_EQ(1, f1_ptr->build_count);
}

TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) {
TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackFalseForFDv2Factory) {
auto f0 = std::make_unique<CountingFactory>();
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
factories.push_back(std::move(f0));
Expand All @@ -185,3 +190,68 @@ TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) {
mgr.NextSynchronizer();
EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback());
}

TEST(SourceManagerTest, FDv1FallbackFactoryStartsBlockedAndIsSkipped) {
auto fdv2 = std::make_unique<CountingFactory>();
auto fdv1 = std::make_unique<FDv1FallbackFactory>();
auto* fdv1_ptr = fdv1.get();
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
factories.push_back(std::move(fdv2));
factories.push_back(std::move(fdv1));
SourceManager mgr(std::move(factories));

EXPECT_EQ(1u, mgr.AvailableSynchronizerCount());
mgr.NextSynchronizer();
EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback());
EXPECT_EQ(0, fdv1_ptr->build_count);
}

TEST(SourceManagerTest, SwitchToFDv1FallbackBlocksFDv2AndUnblocksFDv1) {
auto fdv2 = std::make_unique<CountingFactory>();
auto fdv1 = std::make_unique<FDv1FallbackFactory>();
auto* fdv1_ptr = fdv1.get();
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
factories.push_back(std::move(fdv2));
factories.push_back(std::move(fdv1));
SourceManager mgr(std::move(factories));

mgr.SwitchToFDv1Fallback();

EXPECT_EQ(1u, mgr.AvailableSynchronizerCount());
auto sync = mgr.NextSynchronizer();
ASSERT_NE(sync, nullptr);
EXPECT_EQ(1, fdv1_ptr->build_count);
EXPECT_TRUE(mgr.IsCurrentSynchronizerFDv1Fallback());
}

TEST(SourceManagerTest, SwitchToFDv1FallbackWithoutAdapterBlocksEverything) {
auto fdv2 = std::make_unique<CountingFactory>();
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
factories.push_back(std::move(fdv2));
SourceManager mgr(std::move(factories));

mgr.SwitchToFDv1Fallback();

EXPECT_EQ(0u, mgr.AvailableSynchronizerCount());
EXPECT_EQ(nullptr, mgr.NextSynchronizer());
}

TEST(SourceManagerTest, SwitchToFDv1FallbackUnblocksPreviouslyBlockedFDv2) {
auto fdv2 = std::make_unique<CountingFactory>();
auto fdv1 = std::make_unique<FDv1FallbackFactory>();
auto* fdv1_ptr = fdv1.get();
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
factories.push_back(std::move(fdv2));
factories.push_back(std::move(fdv1));
SourceManager mgr(std::move(factories));

mgr.NextSynchronizer();
mgr.BlockCurrentSynchronizer();
mgr.SwitchToFDv1Fallback();

EXPECT_EQ(1u, mgr.AvailableSynchronizerCount());
auto sync = mgr.NextSynchronizer();
ASSERT_NE(sync, nullptr);
EXPECT_EQ(1, fdv1_ptr->build_count);
EXPECT_TRUE(mgr.IsCurrentSynchronizerFDv1Fallback());
}
Loading