diff --git a/backend/src/database/migrations/V1772556158__addTncActivityTypes.sql b/backend/src/database/migrations/V1772556158__addTncActivityTypes.sql new file mode 100644 index 0000000000..790fa31fc3 --- /dev/null +++ b/backend/src/database/migrations/V1772556158__addTncActivityTypes.sql @@ -0,0 +1,6 @@ +INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES +('enrolled-certification', 'tnc', false, false, 'Successful payment purchase of certification enrollment', 'Enrolled in certification'), +('enrolled-training', 'tnc', false, false, 'Successful payment purchase of training enrollment', 'Enrolled in training'), +('issued-certification', 'tnc', false, false, 'User is granted a certification', 'Issued certification'), +('attempted-course', 'tnc', false, false, 'Certification course is completed', 'Attempted course'), +('attempted-exam', 'tnc', false, false, 'Certification exam is completed', 'Attempted exam'); diff --git a/services/apps/snowflake_connectors/src/core/transformerBase.ts b/services/apps/snowflake_connectors/src/core/transformerBase.ts index cdbeeca284..33caeec635 100644 --- a/services/apps/snowflake_connectors/src/core/transformerBase.ts +++ b/services/apps/snowflake_connectors/src/core/transformerBase.ts @@ -36,7 +36,17 @@ export abstract class TransformerBase { try { return this.transformRow(row) } catch (err) { - log.warn({ err, platform: this.platform }, 'Failed to transform row, skipping') + const message = err instanceof Error ? err.message : String(err) + const stack = err instanceof Error ? err.stack : undefined + log.warn( + { + errMessage: message, + errStack: stack, + platform: this.platform, + rowKeys: Object.keys(row), + }, + 'Failed to transform row, skipping', + ) return null } } diff --git a/services/apps/snowflake_connectors/src/integrations/index.ts b/services/apps/snowflake_connectors/src/integrations/index.ts index 84f6f4656f..7adc90bec0 100644 --- a/services/apps/snowflake_connectors/src/integrations/index.ts +++ b/services/apps/snowflake_connectors/src/integrations/index.ts @@ -8,6 +8,12 @@ import { PlatformType } from '@crowd/types' import { buildSourceQuery as cventBuildSourceQuery } from './cvent/event-registrations/buildSourceQuery' import { CventTransformer } from './cvent/event-registrations/transformer' +import { buildSourceQuery as tncCertificatesBuildQuery } from './tnc/certificates/buildSourceQuery' +import { TncCertificatesTransformer } from './tnc/certificates/transformer' +import { buildSourceQuery as tncCoursesBuildQuery } from './tnc/courses/buildSourceQuery' +import { TncCoursesTransformer } from './tnc/courses/transformer' +import { buildSourceQuery as tncEnrollmentsBuildQuery } from './tnc/enrollments/buildSourceQuery' +import { TncEnrollmentsTransformer } from './tnc/enrollments/transformer' import { DataSource, DataSourceName, PlatformDefinition } from './types' export type { BuildSourceQuery, DataSource, PlatformDefinition } from './types' @@ -23,6 +29,25 @@ const supported: Partial> = { }, ], }, + [PlatformType.TNC]: { + sources: [ + { + name: DataSourceName.TNC_ENROLLMENTS, + buildSourceQuery: tncEnrollmentsBuildQuery, + transformer: new TncEnrollmentsTransformer(), + }, + { + name: DataSourceName.TNC_CERTIFICATES, + buildSourceQuery: tncCertificatesBuildQuery, + transformer: new TncCertificatesTransformer(), + }, + { + name: DataSourceName.TNC_COURSES, + buildSourceQuery: tncCoursesBuildQuery, + transformer: new TncCoursesTransformer(), + }, + ], + }, } const enabled = (process.env.CROWD_SNOWFLAKE_ENABLED_PLATFORMS || '') diff --git a/services/apps/snowflake_connectors/src/integrations/tnc/certificates/buildSourceQuery.ts b/services/apps/snowflake_connectors/src/integrations/tnc/certificates/buildSourceQuery.ts new file mode 100644 index 0000000000..ffb79daf7e --- /dev/null +++ b/services/apps/snowflake_connectors/src/integrations/tnc/certificates/buildSourceQuery.ts @@ -0,0 +1,103 @@ +import { IS_PROD_ENV } from '@crowd/common' + +// Main: analytics.silver_fact.certificates (certificate data) +// Joins: +// - analytics.silver_dim._crowd_dev_segments_union (segment resolution) +// - analytics.bronze_fivetran_salesforce.bronze_salesforce_merged_user (LFID) +// - analytics.silver_dim.users (LFID fallback) +// - analytics.bronze_fivetran_salesforce.accounts + analytics.bronze_fivetran_salesforce_b2b.accounts (org data) + +const CDP_MATCHED_SEGMENTS = ` + cdp_matched_segments AS ( + SELECT DISTINCT + s.SOURCE_ID AS sourceId, + s.slug + FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s + WHERE s.PARENT_SLUG IS NOT NULL + AND s.GRANDPARENTS_SLUG IS NOT NULL + AND s.SOURCE_ID IS NOT NULL + )` + +const ORG_ACCOUNTS = ` + org_accounts AS ( + SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES + FROM analytics.bronze_fivetran_salesforce.accounts + WHERE website IS NOT NULL + UNION ALL + SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES + FROM analytics.bronze_fivetran_salesforce_b2b.accounts + WHERE website IS NOT NULL + )` + +const LFID_COALESCE = `COALESCE(mu.user_name, u.lf_username)` + +export const buildSourceQuery = (sinceTimestamp?: string): string => { + let select = ` + SELECT + c.*, + cms.slug AS PROJECT_SLUG, + org.account_name AS ORGANIZATION_NAME, + org.website AS ORG_WEBSITE, + org.domain_aliases AS ORG_DOMAIN_ALIASES, + org.logo_url AS LOGO_URL, + org.industry AS ORGANIZATION_INDUSTRY, + CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE, + ${LFID_COALESCE} AS LFID + FROM analytics.silver_fact.certificates c + INNER JOIN cdp_matched_segments cms + ON cms.sourceId = c.project_id + LEFT JOIN analytics.bronze_fivetran_salesforce.bronze_salesforce_merged_user mu + ON c.user_id = mu.user_id + AND mu.user_name IS NOT NULL + LEFT JOIN analytics.silver_dim.users u + ON LOWER(c.user_email) = LOWER(u.email) + AND u.lf_username IS NOT NULL + LEFT JOIN org_accounts org + ON c.account_id = org.account_id + WHERE c.user_email IS NOT NULL` + + // Limit to a single project in non-prod to avoid exporting all projects data + if (!IS_PROD_ENV) { + select += ` AND cms.slug = 'cncf'` + } + + const dedup = ` + QUALIFY ROW_NUMBER() OVER (PARTITION BY c.certificate_id ORDER BY org.website DESC) = 1` + + if (!sinceTimestamp) { + return ` + WITH ${ORG_ACCOUNTS}, + ${CDP_MATCHED_SEGMENTS} + ${select} + ${dedup}`.trim() + } + + return ` + WITH ${ORG_ACCOUNTS}, + ${CDP_MATCHED_SEGMENTS}, + new_cdp_segments AS ( + SELECT DISTINCT + s.SOURCE_ID AS sourceId, + s.slug + FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s + WHERE s.CREATED_TS >= '${sinceTimestamp}' + AND s.PARENT_SLUG IS NOT NULL + AND s.GRANDPARENTS_SLUG IS NOT NULL + AND s.SOURCE_ID IS NOT NULL + ) + + -- New certificates since last export + ${select} + AND c.issued_ts >= '${sinceTimestamp}' + ${dedup} + + UNION + + -- All certificates in newly created segments + ${select} + AND EXISTS ( + SELECT 1 FROM new_cdp_segments ncs + WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId + ) + ${dedup}`.trim() +} diff --git a/services/apps/snowflake_connectors/src/integrations/tnc/certificates/transformer.ts b/services/apps/snowflake_connectors/src/integrations/tnc/certificates/transformer.ts new file mode 100644 index 0000000000..f58ab2b44b --- /dev/null +++ b/services/apps/snowflake_connectors/src/integrations/tnc/certificates/transformer.ts @@ -0,0 +1,101 @@ +import { TNC_GRID, TncActivityType } from '@crowd/integrations' +import { getServiceChildLogger } from '@crowd/logging' +import { + IActivityData, + IMemberData, + MemberAttributeName, + MemberIdentityType, + PlatformType, +} from '@crowd/types' + +import { TransformedActivity } from '../../../core/transformerBase' +import { TncTransformerBase } from '../tncTransformerBase' + +const log = getServiceChildLogger('tncCertificatesTransformer') + +export class TncCertificatesTransformer extends TncTransformerBase { + transformRow(row: Record): TransformedActivity | null { + const email = (row.USER_EMAIL as string | null)?.trim() || null + if (!email) { + log.debug({ certificateId: row.CERTIFICATE_ID }, 'Skipping row: missing email') + return null + } + + const certificateId = (row.CERTIFICATE_ID as string)?.trim() + const learnerName = (row.LEARNER_NAME as string | null)?.trim() || null + const lfUsername = (row.LFID as string | null)?.trim() || null + + const identities: IMemberData['identities'] = [] + const sourceId = (row.USER_ID as string | null) || undefined + + if (lfUsername) { + identities.push( + { + platform: PlatformType.TNC, + value: email, + type: MemberIdentityType.EMAIL, + verified: true, + sourceId, + }, + { + platform: PlatformType.TNC, + value: lfUsername, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }, + { + platform: PlatformType.LFID, + value: lfUsername, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }, + ) + } else { + identities.push({ + platform: PlatformType.TNC, + value: email, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }) + } + + const activity: IActivityData = { + type: TncActivityType.ISSUED_CERTIFICATION, + platform: PlatformType.TNC, + timestamp: (row.ISSUED_TS as string | null) || null, + score: TNC_GRID[TncActivityType.ISSUED_CERTIFICATION].score, + sourceId: certificateId, + sourceParentId: (row.COURSE_ID as string | null) || undefined, + member: { + displayName: learnerName || email.split('@')[0], + identities, + organizations: this.buildOrganizations(row), + attributes: { + ...((row.JOB_TITLE as string | null) && { + [MemberAttributeName.JOB_TITLE]: { [PlatformType.TNC]: row.JOB_TITLE as string }, + }), + ...((row.USER_COUNTRY as string | null) && { + [MemberAttributeName.COUNTRY]: { [PlatformType.TNC]: row.USER_COUNTRY as string }, + }), + }, + }, + attributes: { + productName: (row.COURSE_NAME as string | null) || null, + technology: (row.TECHNOLOGIES_LIST as string | null) || null, + didExpire: row.DID_EXPIRE as boolean | null, + expirationDate: (row.EXPIRATION_DATE as string | null) || null, + }, + } + + const segmentSlug = (row.PROJECT_SLUG as string | null)?.trim() || null + const segmentSourceId = (row.PROJECT_ID as string | null)?.trim() || null + if (!segmentSlug || !segmentSourceId) { + return null + } + + return { activity, segment: { slug: segmentSlug, sourceId: segmentSourceId } } + } +} diff --git a/services/apps/snowflake_connectors/src/integrations/tnc/courses/buildSourceQuery.ts b/services/apps/snowflake_connectors/src/integrations/tnc/courses/buildSourceQuery.ts new file mode 100644 index 0000000000..2473d49aaa --- /dev/null +++ b/services/apps/snowflake_connectors/src/integrations/tnc/courses/buildSourceQuery.ts @@ -0,0 +1,114 @@ +import { IS_PROD_ENV } from '@crowd/common' + +// Main: analytics.bronze_census_ti.course_actions (course action data) +// Joins: +// - analytics.bronze_census_ti.users (user resolution via internal_ti_user_id) +// - analytics.bronze_census_ti.courses (course metadata) +// - analytics.silver_fact.enrollments (segment + org resolution via email + course_id) +// - analytics.silver_dim._crowd_dev_segments_union (segment resolution) +// - analytics.bronze_fivetran_salesforce.accounts + analytics.bronze_fivetran_salesforce_b2b.accounts (org data) + +const CDP_MATCHED_SEGMENTS = ` + cdp_matched_segments AS ( + SELECT DISTINCT + s.SOURCE_ID AS sourceId, + s.slug + FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s + WHERE s.PARENT_SLUG IS NOT NULL + AND s.GRANDPARENTS_SLUG IS NOT NULL + AND s.SOURCE_ID IS NOT NULL + )` + +const ORG_ACCOUNTS = ` + org_accounts AS ( + SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES + FROM analytics.bronze_fivetran_salesforce.accounts + WHERE website IS NOT NULL + UNION ALL + SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES + FROM analytics.bronze_fivetran_salesforce_b2b.accounts + WHERE website IS NOT NULL + )` + +export const buildSourceQuery = (sinceTimestamp?: string): string => { + let select = ` + SELECT + ca.*, + co.*, + tu.user_email, + tu.lfid, + tu.learner_name, + tu.user_country, + tu.job_title, + e.project_slug AS PROJECT_SLUG, + e.project_id AS PROJECT_ID, + e.account_id, + org.account_name AS ORGANIZATION_NAME, + org.website AS ORG_WEBSITE, + org.domain_aliases AS ORG_DOMAIN_ALIASES, + org.logo_url AS LOGO_URL, + org.industry AS ORGANIZATION_INDUSTRY, + CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE + FROM analytics.bronze_census_ti.course_actions ca + INNER JOIN analytics.bronze_census_ti.users tu + ON ca.internal_ti_user_id = tu.internal_ti_user_id + INNER JOIN analytics.bronze_census_ti.courses co + ON ca.course_id = co.course_id + INNER JOIN analytics.silver_fact.enrollments e + ON e.course_id = ca.course_id + AND LOWER(e.user_email) = LOWER(tu.user_email) + INNER JOIN cdp_matched_segments cms + ON cms.slug = e.project_slug + AND cms.sourceId = e.project_id + LEFT JOIN org_accounts org + ON e.account_id = org.account_id + WHERE ca.type = 'status_change' + AND ca.source = 'course_started' + AND co.is_test_or_archived = false + AND tu.user_email IS NOT NULL` + + // Limit to a single project in non-prod to avoid exporting all projects data + if (!IS_PROD_ENV) { + select += ` AND e.project_slug = 'cncf'` + } + + const dedup = ` + QUALIFY ROW_NUMBER() OVER (PARTITION BY ca.course_action_id ORDER BY org.website DESC) = 1` + + if (!sinceTimestamp) { + return ` + WITH ${ORG_ACCOUNTS}, + ${CDP_MATCHED_SEGMENTS} + ${select} + ${dedup}`.trim() + } + + return ` + WITH ${ORG_ACCOUNTS}, + ${CDP_MATCHED_SEGMENTS}, + new_cdp_segments AS ( + SELECT DISTINCT + s.SOURCE_ID AS sourceId, + s.slug + FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s + WHERE s.CREATED_TS >= '${sinceTimestamp}' + AND s.PARENT_SLUG IS NOT NULL + AND s.GRANDPARENTS_SLUG IS NOT NULL + AND s.SOURCE_ID IS NOT NULL + ) + + -- New course actions since last export + ${select} + AND ca.timestamp >= '${sinceTimestamp}' + ${dedup} + + UNION + + -- All course actions in newly created segments + ${select} + AND EXISTS ( + SELECT 1 FROM new_cdp_segments ncs + WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId + ) + ${dedup}`.trim() +} diff --git a/services/apps/snowflake_connectors/src/integrations/tnc/courses/transformer.ts b/services/apps/snowflake_connectors/src/integrations/tnc/courses/transformer.ts new file mode 100644 index 0000000000..df214cace8 --- /dev/null +++ b/services/apps/snowflake_connectors/src/integrations/tnc/courses/transformer.ts @@ -0,0 +1,134 @@ +import { TNC_GRID, TncActivityType } from '@crowd/integrations' +import { getServiceChildLogger } from '@crowd/logging' +import { + IActivityData, + IMemberData, + MemberAttributeName, + MemberIdentityType, + PlatformType, +} from '@crowd/types' + +import { TransformedActivity } from '../../../core/transformerBase' +import { TncTransformerBase } from '../tncTransformerBase' + +const log = getServiceChildLogger('tncCoursesTransformer') + +export class TncCoursesTransformer extends TncTransformerBase { + transformRow(row: Record): TransformedActivity | null { + const email = (row.USER_EMAIL as string | null)?.trim() || null + if (!email) { + log.warn( + { courseActionId: row.COURSE_ACTION_ID, rawUserEmail: row.USER_EMAIL }, + 'Skipping row: missing email', + ) + return null + } + + const courseActionId = (row.COURSE_ACTION_ID as string | null)?.trim() || null + if (!courseActionId) { + log.warn('Skipping row: missing courseActionId') + return null + } + + const learnerName = (row.LEARNER_NAME as string | null)?.trim() || null + const lfUsername = (row.LFID as string | null)?.trim() || null + + const identities: IMemberData['identities'] = [] + const sourceId = undefined + + if (lfUsername) { + identities.push( + { + platform: PlatformType.TNC, + value: email, + type: MemberIdentityType.EMAIL, + verified: true, + sourceId, + }, + { + platform: PlatformType.TNC, + value: lfUsername, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }, + { + platform: PlatformType.LFID, + value: lfUsername, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }, + ) + } else { + identities.push({ + platform: PlatformType.TNC, + value: email, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }) + } + + const productType = (row.PRODUCT_TYPE as string | null)?.trim() || null + + let type: TncActivityType + if (productType?.toLowerCase() === 'training') { + type = TncActivityType.ATTEMPTED_COURSE + } else if (productType?.toLowerCase() === 'certification') { + type = TncActivityType.ATTEMPTED_EXAM + } else { + log.warn({ courseActionId, productType }, 'Skipping row: unrecognized product type') + return null + } + + const activity: IActivityData = { + type, + platform: PlatformType.TNC, + timestamp: (row.TIMESTAMP as string | null) || null, + score: TNC_GRID[type].score, + sourceId: courseActionId, + sourceParentId: (row.COURSE_ID as string | null) || undefined, + member: { + displayName: learnerName || email.split('@')[0], + identities, + organizations: this.buildOrganizations(row), + attributes: { + ...((row.JOB_TITLE as string | null) && { + [MemberAttributeName.JOB_TITLE]: { [PlatformType.TNC]: row.JOB_TITLE as string }, + }), + ...((row.USER_COUNTRY as string | null) && { + [MemberAttributeName.COUNTRY]: { [PlatformType.TNC]: row.USER_COUNTRY as string }, + }), + }, + }, + attributes: { + productName: (row.TITLE as string | null) || null, + productType: (row.PRODUCT_TYPE as string | null) || null, + parentProduct: (row.COURSE_GROUP_ID as string | null) || null, + courseSlug: (row.SLUG as string | null) || null, + instructionType: (row.INSTRUCTION_TYPE as string | null) || null, + isCertification: Boolean(row.IS_CERTIFICATION), + isTraining: Boolean(row.IS_TRAINING), + }, + } + + const segmentSlug = (row.PROJECT_SLUG as string | null)?.trim() || null + const segmentSourceId = (row.PROJECT_ID as string | null)?.trim() || null + if (!segmentSlug || !segmentSourceId) { + log.warn( + { + courseActionId, + segmentSlug, + segmentSourceId, + rawProjectSlug: row.PROJECT_SLUG, + rawProjectId: row.PROJECT_ID, + }, + 'Skipping row: missing segment slug or sourceId', + ) + return null + } + + return { activity, segment: { slug: segmentSlug, sourceId: segmentSourceId } } + } +} diff --git a/services/apps/snowflake_connectors/src/integrations/tnc/enrollments/buildSourceQuery.ts b/services/apps/snowflake_connectors/src/integrations/tnc/enrollments/buildSourceQuery.ts new file mode 100644 index 0000000000..82d1be636a --- /dev/null +++ b/services/apps/snowflake_connectors/src/integrations/tnc/enrollments/buildSourceQuery.ts @@ -0,0 +1,128 @@ +import { IS_PROD_ENV } from '@crowd/common' + +// Main: analytics.silver_fact.enrollments (enrollment data) +// Joins: +// - analytics.silver_dim._crowd_dev_segments_union (segment resolution) +// - analytics.bronze_fivetran_salesforce.bronze_salesforce_merged_user (LFID) +// - analytics.silver_dim.users (LFID fallback) +// - analytics.bronze_fivetran_salesforce.accounts + analytics.bronze_fivetran_salesforce_b2b.accounts (org data) +// - analytics.bronze_census_ti.course_actions + analytics.bronze_census_ti.users (course status) + +const CDP_MATCHED_SEGMENTS = ` + cdp_matched_segments AS ( + SELECT DISTINCT + s.SOURCE_ID AS sourceId, + s.slug + FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s + WHERE s.PARENT_SLUG IS NOT NULL + AND s.GRANDPARENTS_SLUG IS NOT NULL + AND s.SOURCE_ID IS NOT NULL + )` + +const ORG_ACCOUNTS = ` + org_accounts AS ( + SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES + FROM analytics.bronze_fivetran_salesforce.accounts + WHERE website IS NOT NULL + UNION ALL + SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES + FROM analytics.bronze_fivetran_salesforce_b2b.accounts + WHERE website IS NOT NULL + )` + +const LFID_COALESCE = `COALESCE(mu.user_name, u.lf_username)` + +const COURSE_STATUS = ` + course_status AS ( + SELECT + tu.user_email, + ca.course_id, + MAX(CASE WHEN ca.source = 'course_started' THEN ca.timestamp END) AS COURSE_STARTED_DATE, + MAX(CASE WHEN ca.source = 'course_completed' THEN ca.timestamp END) AS COURSE_COMPLETED_DATE, + MAX_BY(ca.source, ca.timestamp) AS COURSE_STATUS + FROM analytics.bronze_census_ti.course_actions ca + INNER JOIN analytics.bronze_census_ti.users tu + ON ca.internal_ti_user_id = tu.internal_ti_user_id + WHERE ca.type = 'status_change' + AND ca.source IN ('course_started', 'course_completed') + GROUP BY tu.user_email, ca.course_id + )` + +export const buildSourceQuery = (sinceTimestamp?: string): string => { + let select = ` + SELECT + e.*, + org.account_name AS ORGANIZATION_NAME, + org.website AS ORG_WEBSITE, + org.domain_aliases AS ORG_DOMAIN_ALIASES, + org.logo_url AS LOGO_URL, + org.industry AS ORGANIZATION_INDUSTRY, + CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE, + ${LFID_COALESCE} AS LFID, + cs.COURSE_STARTED_DATE, + cs.COURSE_COMPLETED_DATE, + cs.COURSE_STATUS + FROM analytics.silver_fact.enrollments e + INNER JOIN cdp_matched_segments cms + ON cms.slug = e.project_slug + AND cms.sourceId = e.project_id + LEFT JOIN analytics.bronze_fivetran_salesforce.bronze_salesforce_merged_user mu + ON e.user_id = mu.user_id + AND mu.user_name IS NOT NULL + LEFT JOIN analytics.silver_dim.users u + ON LOWER(e.user_email) = LOWER(u.email) + AND u.lf_username IS NOT NULL + LEFT JOIN org_accounts org + ON e.account_id = org.account_id + LEFT JOIN course_status cs + ON LOWER(e.user_email) = LOWER(cs.user_email) + AND e.course_id = cs.course_id + WHERE e.user_email IS NOT NULL` + + // Limit to a single project in non-prod to avoid exporting all projects data + if (!IS_PROD_ENV) { + select += ` AND e.project_slug = 'cncf'` + } + + const dedup = ` + QUALIFY ROW_NUMBER() OVER (PARTITION BY e.enrollment_id ORDER BY org.website DESC) = 1` + + if (!sinceTimestamp) { + return ` + WITH ${ORG_ACCOUNTS}, + ${CDP_MATCHED_SEGMENTS}, + ${COURSE_STATUS} + ${select} + ${dedup}`.trim() + } + + return ` + WITH ${ORG_ACCOUNTS}, + ${CDP_MATCHED_SEGMENTS}, + ${COURSE_STATUS}, + new_cdp_segments AS ( + SELECT DISTINCT + s.SOURCE_ID AS sourceId, + s.slug + FROM ANALYTICS.SILVER_DIM._CROWD_DEV_SEGMENTS_UNION s + WHERE s.CREATED_TS >= '${sinceTimestamp}' + AND s.PARENT_SLUG IS NOT NULL + AND s.GRANDPARENTS_SLUG IS NOT NULL + AND s.SOURCE_ID IS NOT NULL + ) + + -- New enrollments since last export + ${select} + AND e.enrollment_ts >= '${sinceTimestamp}' + ${dedup} + + UNION + + -- All enrollments in newly created segments + ${select} + AND EXISTS ( + SELECT 1 FROM new_cdp_segments ncs + WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId + ) + ${dedup}`.trim() +} diff --git a/services/apps/snowflake_connectors/src/integrations/tnc/enrollments/transformer.ts b/services/apps/snowflake_connectors/src/integrations/tnc/enrollments/transformer.ts new file mode 100644 index 0000000000..6a19dcd84b --- /dev/null +++ b/services/apps/snowflake_connectors/src/integrations/tnc/enrollments/transformer.ts @@ -0,0 +1,139 @@ +import { TNC_GRID, TncActivityType } from '@crowd/integrations' +import { getServiceChildLogger } from '@crowd/logging' +import { + IActivityData, + IMemberData, + MemberAttributeName, + MemberIdentityType, + PlatformType, +} from '@crowd/types' + +import { TransformedActivity } from '../../../core/transformerBase' +import { TncTransformerBase } from '../tncTransformerBase' + +const log = getServiceChildLogger('tncEnrollmentsTransformer') + +export class TncEnrollmentsTransformer extends TncTransformerBase { + transformRow(row: Record): TransformedActivity | null { + const email = (row.USER_EMAIL as string | null)?.trim() || null + if (!email) { + log.warn( + { enrollmentId: row.ENROLLMENT_ID, rawUserEmail: row.USER_EMAIL }, + 'Skipping row: missing email', + ) + return null + } + + const enrollmentId = (row.ENROLLMENT_ID as string)?.trim() + const learnerName = (row.LEARNER_NAME as string | null)?.trim() || null + const lfUsername = (row.LFID as string | null)?.trim() || null + + const identities: IMemberData['identities'] = [] + const sourceId = (row.USER_ID as string | null) || undefined + + if (lfUsername) { + identities.push( + { + platform: PlatformType.TNC, + value: email, + type: MemberIdentityType.EMAIL, + verified: true, + sourceId, + }, + { + platform: PlatformType.TNC, + value: lfUsername, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }, + { + platform: PlatformType.LFID, + value: lfUsername, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }, + ) + } else { + identities.push({ + platform: PlatformType.TNC, + value: email, + type: MemberIdentityType.USERNAME, + verified: true, + sourceId, + }) + } + + const productType = (row.PRODUCT_TYPE as string | null)?.trim() || null + const instructionType = (row.INSTRUCTION_TYPE as string | null)?.trim() || null + + let type: TncActivityType + if ( + productType?.toLowerCase() === 'certification' && + instructionType?.toLowerCase() === 'certification exam' + ) { + type = TncActivityType.ENROLLED_CERTIFICATION + } else if (productType?.toLowerCase() === 'training') { + type = TncActivityType.ENROLLED_TRAINING + } else { + log.warn( + { enrollmentId, productType, instructionType }, + 'Skipping row: unrecognized product/instruction type', + ) + return null + } + + const activity: IActivityData = { + type, + platform: PlatformType.TNC, + timestamp: (row.ENROLLMENT_TS as string | null) || null, + score: TNC_GRID[type].score, + sourceId: enrollmentId, + sourceParentId: (row.COURSE_ID as string | null) || undefined, + member: { + displayName: learnerName || email.split('@')[0], + identities, + organizations: this.buildOrganizations(row), + attributes: { + ...((row.LEARNER_TITLE as string | null) && { + [MemberAttributeName.JOB_TITLE]: { [PlatformType.TNC]: row.LEARNER_TITLE as string }, + }), + ...((row.USER_COUNTRY as string | null) && { + [MemberAttributeName.COUNTRY]: { [PlatformType.TNC]: row.USER_COUNTRY as string }, + }), + }, + }, + attributes: { + productName: (row.COURSE_NAME as string | null) || null, + productType, + technology: (row.TECHNOLOGIES_LIST as string | null) || null, + parentProduct: (row.COURSE_GROUP_ID as string | null) || null, + courseCode: (row.COURSE_CODE as string | null) || null, + instructionType, + location: (row.LOCATION as string | null) || null, + courseStatus: (row.COURSE_STATUS as string | null) || null, + courseStartedDate: (row.COURSE_STARTED_DATE as string | null) || null, + courseCompletedDate: (row.COURSE_COMPLETED_DATE as string | null) || null, + }, + } + + const segmentSlug = (row.PROJECT_SLUG as string | null)?.trim() || null + const segmentSourceId = (row.PROJECT_ID as string | null)?.trim() || null + if (!segmentSlug || !segmentSourceId) { + log.warn( + { + enrollmentId, + segmentSlug, + segmentSourceId, + rawProjectSlug: row.PROJECT_SLUG, + rawProjectId: row.PROJECT_ID, + }, + 'Skipping row: missing segment slug or sourceId', + ) + return null + } + + return { activity, segment: { slug: segmentSlug, sourceId: segmentSourceId } } + } +} diff --git a/services/apps/snowflake_connectors/src/integrations/tnc/tncTransformerBase.ts b/services/apps/snowflake_connectors/src/integrations/tnc/tncTransformerBase.ts new file mode 100644 index 0000000000..e8350dc7e1 --- /dev/null +++ b/services/apps/snowflake_connectors/src/integrations/tnc/tncTransformerBase.ts @@ -0,0 +1,63 @@ +import { + IActivityData, + IOrganizationIdentity, + OrganizationIdentityType, + OrganizationSource, + PlatformType, +} from '@crowd/types' + +import { TransformerBase } from '../../core/transformerBase' + +export abstract class TncTransformerBase extends TransformerBase { + readonly platform = PlatformType.TNC + + protected buildOrganizations( + row: Record, + ): IActivityData['member']['organizations'] { + const website = (row.ORG_WEBSITE as string | null)?.trim() || null + const domainAliases = (row.ORG_DOMAIN_ALIASES as string | null)?.trim() || null + + if (!website && !domainAliases) { + return undefined + } + + const identities: IOrganizationIdentity[] = [] + + if (website) { + identities.push({ + platform: PlatformType.TNC, + value: website, + type: OrganizationIdentityType.PRIMARY_DOMAIN, + verified: true, + }) + } + + if (domainAliases) { + for (const alias of domainAliases.split(',')) { + const trimmed = alias.trim() + if (trimmed) { + identities.push({ + platform: PlatformType.TNC, + value: trimmed, + type: OrganizationIdentityType.ALTERNATIVE_DOMAIN, + verified: true, + }) + } + } + } + + return [ + { + displayName: (row.ORGANIZATION_NAME as string | null)?.trim() || website, + source: OrganizationSource.TNC, + identities, + logo: (row.LOGO_URL as string | null)?.trim() || undefined, + size: + typeof row.ORGANIZATION_SIZE === 'string' + ? row.ORGANIZATION_SIZE.trim() || undefined + : undefined, + industry: (row.ORGANIZATION_INDUSTRY as string | null)?.trim() || undefined, + }, + ] + } +} diff --git a/services/apps/snowflake_connectors/src/integrations/types.ts b/services/apps/snowflake_connectors/src/integrations/types.ts index 01208e1783..36377a4f4f 100644 --- a/services/apps/snowflake_connectors/src/integrations/types.ts +++ b/services/apps/snowflake_connectors/src/integrations/types.ts @@ -5,6 +5,9 @@ export type BuildSourceQuery = (sinceTimestamp?: string) => string // Each data source maps to a distinct Snowflake table (or joined set of tables) that is exported and transformed independently. export enum DataSourceName { CVENT_EVENT_REGISTRATIONS = 'event-registrations', + TNC_ENROLLMENTS = 'enrollments', + TNC_CERTIFICATES = 'certificates', + TNC_COURSES = 'courses', } export interface DataSource { diff --git a/services/libs/data-access-layer/src/organizations/attributesConfig.ts b/services/libs/data-access-layer/src/organizations/attributesConfig.ts index a1f98261f7..c8d0414c3c 100644 --- a/services/libs/data-access-layer/src/organizations/attributesConfig.ts +++ b/services/libs/data-access-layer/src/organizations/attributesConfig.ts @@ -232,6 +232,7 @@ export const ORG_DB_ATTRIBUTE_SOURCE_PRIORITY = [ OrganizationAttributeSource.ENRICHMENT_LFX_INTERNAL_API, OrganizationAttributeSource.ENRICHMENT_PEOPLEDATALABS, OrganizationAttributeSource.CVENT, + OrganizationAttributeSource.TNC, // legacy - keeping this for backward compatibility OrganizationAttributeSource.ENRICHMENT, OrganizationAttributeSource.GITHUB, diff --git a/services/libs/integrations/src/integrations/index.ts b/services/libs/integrations/src/integrations/index.ts index f58014ec98..3db691cd2b 100644 --- a/services/libs/integrations/src/integrations/index.ts +++ b/services/libs/integrations/src/integrations/index.ts @@ -50,4 +50,6 @@ export * from './groupsio/memberAttributes' export * from './cvent/types' +export * from './tnc/types' + export * from './activityDisplayService' diff --git a/services/libs/integrations/src/integrations/tnc/types.ts b/services/libs/integrations/src/integrations/tnc/types.ts new file mode 100644 index 0000000000..5c97ad6c02 --- /dev/null +++ b/services/libs/integrations/src/integrations/tnc/types.ts @@ -0,0 +1,17 @@ +import { IActivityScoringGrid } from '@crowd/types' + +export enum TncActivityType { + ENROLLED_CERTIFICATION = 'enrolled-certification', + ENROLLED_TRAINING = 'enrolled-training', + ISSUED_CERTIFICATION = 'issued-certification', + ATTEMPTED_COURSE = 'attempted-course', + ATTEMPTED_EXAM = 'attempted-exam', +} + +export const TNC_GRID: Record = { + [TncActivityType.ENROLLED_CERTIFICATION]: { score: 1 }, + [TncActivityType.ENROLLED_TRAINING]: { score: 1 }, + [TncActivityType.ISSUED_CERTIFICATION]: { score: 1 }, + [TncActivityType.ATTEMPTED_COURSE]: { score: 1 }, + [TncActivityType.ATTEMPTED_EXAM]: { score: 1 }, +} diff --git a/services/libs/types/src/enums/organizations.ts b/services/libs/types/src/enums/organizations.ts index 24dd1cceeb..3dc62f28dc 100644 --- a/services/libs/types/src/enums/organizations.ts +++ b/services/libs/types/src/enums/organizations.ts @@ -13,6 +13,7 @@ export enum OrganizationSource { GITHUB = 'github', UI = 'ui', CVENT = 'cvent', + TNC = 'tnc', } export enum OrganizationMergeSuggestionType { @@ -40,6 +41,7 @@ export enum OrganizationAttributeSource { ENRICHMENT_LFX_INTERNAL_API = 'enrichment-lfx-internal-api', ENRICHMENT_PEOPLEDATALABS = 'enrichment-peopledatalabs', CVENT = 'cvent', + TNC = 'tnc', // legacy - keeping this for backward compatibility ENRICHMENT = 'enrichment', GITHUB = 'github',