Skip to content

Commit 1de1159

Browse files
committed
chore: sync github repos if fullSync
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent d225d97 commit 1de1159

12 files changed

Lines changed: 343 additions & 113 deletions

File tree

backend/src/bin/jobs/refreshGithubRepoSettings.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* eslint-disable no-continue */
22
import cronGenerator from 'cron-time-generator'
33

4-
import { timeout } from '@crowd/common'
4+
import { IS_DEV_ENV, timeout } from '@crowd/common'
55
import { getServiceChildLogger } from '@crowd/logging'
66

77
import SequelizeRepository from '../../database/repositories/sequelizeRepository'
@@ -59,7 +59,7 @@ export const refreshGithubRepoSettings = async () => {
5959
const job: CrowdJob = {
6060
name: 'Refresh Github repo settings',
6161
// every day
62-
cronTime: cronGenerator.every(1).days(),
62+
cronTime: IS_DEV_ENV ? cronGenerator.every(5).minutes() : cronGenerator.every(1).days(),
6363
onTrigger: async () => {
6464
await refreshGithubRepoSettings()
6565
},

backend/src/segment/track.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getServiceChildLogger } from '@crowd/logging'
22
import { Edition } from '@crowd/types'
33

4-
import { API_CONFIG, IS_TEST_ENV, SEGMENT_CONFIG } from '../conf'
4+
import { API_CONFIG, IS_DEV_ENV, IS_TEST_ENV, SEGMENT_CONFIG } from '../conf'
55
import SequelizeRepository from '../database/repositories/sequelizeRepository'
66

77
import { CROWD_ANALYTICS_PLATORM_NAME } from './addProductDataToCrowdTenant'
@@ -21,6 +21,7 @@ export default async function identify(
2121
}).email
2222
if (
2323
!IS_TEST_ENV &&
24+
!IS_DEV_ENV &&
2425
SEGMENT_CONFIG.writeKey &&
2526
// This is only for events in the hosted version. Self-hosted has less telemetry.
2627
(API_CONFIG.edition === Edition.CROWD_HOSTED || API_CONFIG.edition === Edition.LFX) &&
@@ -41,6 +42,10 @@ export default async function identify(
4142

4243
const { userIdOut, tenantIdOut } = getTenatUser(userId, options)
4344

45+
if (!userIdOut) {
46+
return
47+
}
48+
4449
const payload = {
4550
userId: userIdOut,
4651
event,

backend/src/services/integrationService.ts

Lines changed: 10 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import { request } from '@octokit/request'
44
import axios, { AxiosRequestConfig, AxiosResponse } from 'axios'
55
import lodash from 'lodash'
66
import moment from 'moment'
7-
import { QueryTypes, Transaction } from 'sequelize'
8-
import { v4 as uuidv4 } from 'uuid'
7+
import { Transaction } from 'sequelize'
98

109
import { EDITION, Error400, Error404, Error542 } from '@crowd/common'
1110
import { getGithubInstallationToken } from '@crowd/common_services'
@@ -15,6 +14,7 @@ import {
1514
deleteSegmentRepositories,
1615
upsertSegmentRepositories,
1716
} from '@crowd/data-access-layer/src/collections'
17+
import { syncRepositoriesToGitV2 } from '@crowd/data-access-layer/src/integrations'
1818
import {
1919
NangoIntegration,
2020
connectNangoIntegration,
@@ -908,7 +908,7 @@ export default class IntegrationService {
908908
args: [{ integrationIds: [integration.id] }],
909909
})
910910

911-
return await txService.findById(integrationId)
911+
return await this.findById(integration.id)
912912
} catch (err) {
913913
this.options.log.error(err, 'Error while creating or updating GitHub integration!')
914914
if (!existingTransaction) {
@@ -1340,11 +1340,15 @@ export default class IntegrationService {
13401340
)
13411341

13421342
// upsert repositories to git.repositories in order to be processed by git-integration V2
1343-
await this.syncRepositoriesToGitV2(
1344-
remotes,
1345-
options || this.options,
1343+
const qx = SequelizeRepository.getQueryExecutor({
1344+
...(options || this.options),
13461345
transaction,
1346+
})
1347+
await syncRepositoriesToGitV2(
1348+
qx,
1349+
remotes,
13471350
integration.id,
1351+
(options || this.options).currentSegments[0].id,
13481352
)
13491353

13501354
// Only commit if we created the transaction ourselves
@@ -1362,95 +1366,6 @@ export default class IntegrationService {
13621366
return integration
13631367
}
13641368

1365-
/**
1366-
* Syncs repositories to git.repositories table (git-integration V2)
1367-
* @param remotes Array of repository objects with url and optional forkedFrom
1368-
* @param options Repository options
1369-
* @param transaction Database transaction
1370-
* @param integrationId The integration ID from the git integration
1371-
* @param inheritFromExistingRepos If true, queries githubRepos and gitlabRepos for IDs; if false, generates new UUIDs
1372-
*
1373-
* TODO: @Mouad After migration is complete, simplify this function by:
1374-
* 1. Using an object parameter instead of multiple parameters for better maintainability
1375-
* 2. Removing the inheritFromExistingRepos parameter since git.repositories will be the source of truth
1376-
* 3. Simplifying the logic to only handle git.repositories operations
1377-
*/
1378-
private async syncRepositoriesToGitV2(
1379-
remotes: Array<{ url: string; forkedFrom?: string | null }>,
1380-
options: IRepositoryOptions,
1381-
transaction: Transaction,
1382-
integrationId: string,
1383-
) {
1384-
const seq = SequelizeRepository.getSequelize(options)
1385-
1386-
let repositoriesToSync: Array<{
1387-
id: string
1388-
url: string
1389-
integrationId: string
1390-
segmentId: string
1391-
forkedFrom?: string | null
1392-
}> = []
1393-
// check GitHub repos first, fallback to GitLab repos if none found
1394-
const existingRepos: Array<{
1395-
id: string
1396-
url: string
1397-
}> = await seq.query(
1398-
`
1399-
WITH github_repos AS (
1400-
SELECT id, url FROM "githubRepos"
1401-
WHERE url IN (:urls) AND "deletedAt" IS NULL
1402-
),
1403-
gitlab_repos AS (
1404-
SELECT id, url FROM "gitlabRepos"
1405-
WHERE url IN (:urls) AND "deletedAt" IS NULL
1406-
)
1407-
SELECT id, url FROM github_repos
1408-
UNION ALL
1409-
SELECT id, url FROM gitlab_repos
1410-
WHERE NOT EXISTS (SELECT 1 FROM github_repos)
1411-
`,
1412-
{
1413-
replacements: {
1414-
urls: remotes.map((r) => r.url),
1415-
},
1416-
type: QueryTypes.SELECT,
1417-
transaction,
1418-
},
1419-
)
1420-
1421-
// Create a map of url to forkedFrom for quick lookup
1422-
const forkedFromMap = new Map(remotes.map((r) => [r.url, r.forkedFrom]))
1423-
1424-
repositoriesToSync = existingRepos.map((repo) => ({
1425-
id: repo.id,
1426-
url: repo.url,
1427-
integrationId,
1428-
segmentId: options.currentSegments[0].id,
1429-
forkedFrom: forkedFromMap.get(repo.url) || null,
1430-
}))
1431-
1432-
if (repositoriesToSync.length === 0) {
1433-
this.options.log.warn(
1434-
'No existing repos found in githubRepos or gitlabRepos - inserting new to git.repositories with new uuid',
1435-
)
1436-
repositoriesToSync = remotes.map((remote) => ({
1437-
id: uuidv4(), // Generate new UUID
1438-
url: remote.url,
1439-
integrationId,
1440-
segmentId: options.currentSegments[0].id,
1441-
forkedFrom: remote.forkedFrom || null,
1442-
}))
1443-
}
1444-
1445-
// Sync to git.repositories v2
1446-
await GitReposRepository.upsert(repositoriesToSync, {
1447-
...options,
1448-
transaction,
1449-
})
1450-
1451-
this.options.log.info(`Synced ${repositoriesToSync.length} repos to git v2`)
1452-
}
1453-
14541369
async atlassianAdminConnect(adminApi: string, organizationId: string) {
14551370
const nangoPayload = {
14561371
params: {

services/apps/cron_service/src/jobs/nangoGithubSync.job.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import CronTime from 'cron-time-generator'
22

3+
import { IS_DEV_ENV } from '@crowd/common'
34
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
45
import { fetchNangoIntegrationData } from '@crowd/data-access-layer/src/integrations'
56
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
@@ -12,7 +13,7 @@ import { IJobDefinition } from '../types'
1213
const job: IJobDefinition = {
1314
name: 'nango-github-sync',
1415
cronTime: CronTime.every(
15-
Number(process.env.CROWD_GH_NANGO_SYNC_INTERVAL_MINUTES || 60),
16+
Number(process.env.CROWD_GH_NANGO_SYNC_INTERVAL_MINUTES || IS_DEV_ENV ? 5 : 60),
1617
).minutes(),
1718
timeout: 10 * 60,
1819
process: async (ctx) => {

services/apps/nango_worker/src/activities.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ import {
22
analyzeGithubIntegration,
33
createGithubConnection,
44
deleteConnection,
5+
mapGithubRepo,
56
numberOfGithubConnectionsToCreate,
67
processNangoWebhook,
78
removeGithubConnection,
89
setGithubConnection,
910
startNangoSync,
1011
unmapGithubRepo,
12+
updateGitIntegrationWithRepo,
1113
} from './activities/nangoActivities'
1214

1315
export {
@@ -18,6 +20,8 @@ export {
1820
removeGithubConnection,
1921
setGithubConnection,
2022
startNangoSync,
23+
mapGithubRepo,
2124
unmapGithubRepo,
2225
numberOfGithubConnectionsToCreate,
26+
updateGitIntegrationWithRepo,
2327
}

services/apps/nango_worker/src/activities/nangoActivities.ts

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { IS_DEV_ENV, IS_STAGING_ENV, singleOrDefault } from '@crowd/common'
22
import GithubIntegrationService from '@crowd/common_services/src/services/github.integration.service'
33
import {
4+
addGitHubRepoMapping,
45
addGithubNangoConnection,
6+
addRepoToGitIntegration,
57
fetchIntegrationById,
68
findIntegrationDataForNangoWebhookProcessing,
79
removeGitHubRepoMapping,
@@ -55,7 +57,7 @@ export async function numberOfGithubConnectionsToCreate(): Promise<number> {
5557

5658
if (IS_DEV_ENV || IS_STAGING_ENV) {
5759
svc.log.info('[GITHUB] Number of github connections to create: 5')
58-
return 5
60+
return 10
5961
}
6062

6163
const lastConnectDate = await getLastConnectTs()
@@ -186,6 +188,10 @@ export async function analyzeGithubIntegration(
186188
repo: IGithubRepoData
187189
connectionId: string
188190
}[] = []
191+
const duplicatesToDelete: {
192+
repo: IGithubRepoData
193+
connectionId: string
194+
}[] = []
189195
const reposToSync: IGithubRepoData[] = []
190196

191197
const integration = await fetchIntegrationById(dbStoreQx(svc.postgres.writer), integrationId)
@@ -238,19 +244,44 @@ export async function analyzeGithubIntegration(
238244
if (settings.nangoMapping) {
239245
const nangoMapping = settings.nangoMapping as Record<string, IGithubRepoData>
240246

241-
for (const connectionId of Object.keys(nangoMapping)) {
247+
const connectionIds = Object.keys(nangoMapping)
248+
249+
// check for duplicates as well by tracking which repos have connectionIds
250+
const existingConnectedRepos = []
251+
for (const connectionId of connectionIds) {
242252
const mappedRepo = nangoMapping[connectionId]
243-
const found = singleOrDefault(
244-
finalRepos,
245-
(r) => r.owner === mappedRepo.owner && r.repoName === mappedRepo.repoName,
246-
)
247253

248-
// if repo is in nangoMapping but not in settings delete the connection
249-
if (!found) {
250-
reposToDelete.push({
254+
if (
255+
existingConnectedRepos.some(
256+
(r) => r.owner === mappedRepo.owner && r.repoName === mappedRepo.repoName,
257+
)
258+
) {
259+
// found duplicate connectionId for the same repo
260+
duplicatesToDelete.push({
251261
repo: mappedRepo,
252262
connectionId,
253263
})
264+
265+
// just so that later singleOrDefault doesn't find it
266+
delete nangoMapping[connectionId]
267+
} else {
268+
const found = singleOrDefault(
269+
finalRepos,
270+
(r) => r.owner === mappedRepo.owner && r.repoName === mappedRepo.repoName,
271+
)
272+
273+
// if repo is in nangoMapping but not in settings delete the connection
274+
if (!found) {
275+
reposToDelete.push({
276+
repo: mappedRepo,
277+
connectionId,
278+
})
279+
280+
// just so that later singleOrDefault doesn't find it
281+
delete nangoMapping[connectionId]
282+
} else {
283+
existingConnectedRepos.push(mappedRepo)
284+
}
254285
}
255286
}
256287
}
@@ -290,6 +321,7 @@ export async function analyzeGithubIntegration(
290321
return {
291322
providerConfigKey: NangoIntegration.GITHUB,
292323
reposToDelete,
324+
duplicatesToDelete,
293325
reposToSync,
294326
}
295327
}
@@ -382,6 +414,15 @@ export async function deleteConnection(
382414
await deleteNangoConnection(providerConfigKey as NangoIntegration, connectionId)
383415
}
384416

417+
export async function mapGithubRepo(integrationId: string, repo: IGithubRepoData): Promise<void> {
418+
await addGitHubRepoMapping(
419+
dbStoreQx(svc.postgres.writer),
420+
integrationId,
421+
repo.owner,
422+
repo.repoName,
423+
)
424+
}
425+
385426
export async function unmapGithubRepo(integrationId: string, repo: IGithubRepoData): Promise<void> {
386427
// remove repo from githubRepos mapping
387428
await removeGitHubRepoMapping(
@@ -393,6 +434,14 @@ export async function unmapGithubRepo(integrationId: string, repo: IGithubRepoDa
393434
)
394435
}
395436

437+
export async function updateGitIntegrationWithRepo(
438+
integrationId: string,
439+
repo: IGithubRepoData,
440+
): Promise<void> {
441+
const repoUrl = `https://github.com/${repo.owner}/${repo.repoName}`
442+
await addRepoToGitIntegration(dbStoreQx(svc.postgres.writer), integrationId, repoUrl)
443+
}
444+
396445
function parseGithubUrl(url: string): IGithubRepoData {
397446
// Create URL object
398447
const parsedUrl = new URL(url)

services/apps/nango_worker/src/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ export interface IGithubIntegrationSyncInstructions {
1717
repo: IGithubRepoData
1818
connectionId: string
1919
}[]
20+
duplicatesToDelete: {
21+
repo: IGithubRepoData
22+
connectionId: string
23+
}[]
2024
reposToSync: IGithubRepoData[]
2125
}
2226

services/apps/nango_worker/src/workflows/syncGithubIntegration.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ export async function syncGithubIntegration(args: ISyncGithubIntegrationArgument
2626
await activity.unmapGithubRepo(integrationId, repo.repo)
2727
}
2828

29+
// delete duplicate connections
30+
for (const repo of result.duplicatesToDelete) {
31+
// delete nango connection
32+
await activity.deleteConnection(result.providerConfigKey, repo.connectionId)
33+
34+
// delete connection from integrations.settings.nangoMapping object
35+
await activity.removeGithubConnection(integrationId, repo.connectionId)
36+
37+
// we don't unmap because this one was duplicated
38+
}
39+
2940
// create connections for repos that are not already connected
3041
for (const repo of result.reposToSync) {
3142
if (created >= limit) {
@@ -38,6 +49,12 @@ export async function syncGithubIntegration(args: ISyncGithubIntegrationArgument
3849
// add connection to integrations.settings.nangoMapping object
3950
await activity.setGithubConnection(integrationId, repo, connectionId)
4051

52+
// add repo to githubRepos mapping if it's not already mapped
53+
await activity.mapGithubRepo(integrationId, repo)
54+
55+
// add repo to git integration
56+
await activity.updateGitIntegrationWithRepo(integrationId, repo)
57+
4158
// start nango sync
4259
await activity.startNangoSync(result.providerConfigKey, connectionId)
4360

0 commit comments

Comments
 (0)